www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - How to best translate this C++ algorithm into D?

reply "logicchains" <jonathan.t.barnard gmail.com> writes:
I came across an interesting C++ program[1] in a blog post[2] 
that I wanted to try and translate into D. My attempt[3] is 
however still significantly slower than the C++ version; with 
input 100 100 100, for instance, it runs in around 200 ms, 
compared to 60 ms for the C++ version (it however nevertheless 
much faster than my naive C implementation, which runs in around 
2 seconds for that input). That's using the LLVM compiler for 
each language, btw.

The C++ algorithm is:

   std::vector<forest_t> meal(const std::vector<forest_t>& 
forests) {
     static const std::array<forest_t,3> possible_meals = {{
	forest_t{{-1,-1,+1}},
	forest_t{{-1,+1,-1}},
	forest_t{{+1,-1,-1}}
       }};
     // apply possible meals to all forests
     std::vector<forest_t> next_forests;
     next_forests.reserve(forests.size() * possible_meals.size());
     for (auto meal: possible_meals) {
       forest_t next_forest;
       std::transform(std::begin(forests), std::end(forests),
		     std::back_inserter(next_forests),
		     [&](const forest_t& forest) {
		       std::transform(std::begin(forest), std::end(forest),
				      std::begin(meal), std::begin(next_forest), 
std::plus<int>());
		       return next_forest;
		     });
     }
     // filter valid forests
     auto valid_end = std::remove_if(std::begin(next_forests), 
std::end(next_forests),
				    forest_invalid);
     // delete duplicates
     std::stable_sort(std::begin(next_forests), valid_end);
     next_forests.erase(
		       std::unique(std::begin(next_forests), valid_end), 
std::end(next_forests));
     return next_forests;
   }

While my (much more concise; thanks D!) attempt at implementing 
it is:

forest_t[] meal(in forest_t[] forests) {
   forest_t[3] possible_meals = [{-1, -1, +1}, {-1, +1, -1}, {+1, 
-1, -1}];
   return map!(a => [possible_meals[0]+a, possible_meals[1]+a, 
possible_meals[2]+a])(forests)
     .join
     .filter!(a => !forest_invalid(a)).array
     .sort.uniq.array;
}

Any suggestions for how I could make the D code do the same as 
the C++ standard transform? Some sort of flatmap could be useful, 
to avoid the need for a join. It'd also be nice if there was a 
way to sort/uniq the filterresults directly without having to 
convert to an array first.

1. 
http://www.unisoftwareplus.com/download/blog/2014-06/magic_forest.cpp
2. 
http://unriskinsight.blogspot.com.au/2014/06/fast-functional-goats-lions-and-wolves.html
3. https://gist.github.com/logicchains/2442dfca5d517f1e4bab
Jun 06 2014
next sibling parent Timon Gehr <timon.gehr gmx.ch> writes:
On 06/07/2014 05:50 AM, logicchains wrote:
 While my (much more concise; thanks D!) attempt at implementing it is:

 forest_t[] meal(in forest_t[] forests) {
    forest_t[3] possible_meals = [{-1, -1, +1}, {-1, +1, -1}, {+1, -1, -1}];
    return map!(a => [possible_meals[0]+a, possible_meals[1]+a,
possible_meals[2]+a])(forests)
      .join
      .filter!(a => !forest_invalid(a)).array
      .sort.uniq.array;
 }

 Any suggestions for how I could make the D code do the same as the C++
 standard transform? Some sort of flatmap could be useful, to avoid the
 need for a join.
You could use map instead of an explicit array literal doing manual mapping eagerly and I'm quite confident this accounts for much of the performance difference.
 It'd also be nice if there was a way to sort/uniq the
 filterresults directly without having to convert to an array first.
You could use 'partition' instead of 'filter'.
Jun 07 2014
prev sibling next sibling parent "monarch_dodra" <monarchdodra gmail.com> writes:
On Saturday, 7 June 2014 at 03:50:17 UTC, logicchains wrote:
       std::transform(std::begin(forests), std::end(forests),
 		     std::back_inserter(next_forests),
 		     [&](const forest_t& forest) {
 		       std::transform(std::begin(forest), std::end(forest),
 				      std::begin(meal), std::begin(next_forest), 
 std::plus<int>());
 		       return next_forest;
 		     });
     }
You translate this chunk into:
 forest_t[] meal(in forest_t[] forests) {
   forest_t[3] possible_meals = [{-1, -1, +1}, {-1, +1, -1}, 
 {+1, -1, -1}];
   return map!(a => [possible_meals[0]+a, possible_meals[1]+a, 
 possible_meals[2]+a])(forests)
     .join
First, "[possible_meals[0]+a, ...]" will allocate a new slice *every* *single* *time*. Also, you probably want "joiner" rather than straight up "join". Although "join|er" as often proved slow in benchmarks. I have to run, so I can't suggest what to use instead, but I still thought I'd point it out.
Jun 07 2014
prev sibling parent reply "deadalnix" <deadalnix gmail.com> writes:
On Saturday, 7 June 2014 at 03:50:17 UTC, logicchains wrote:
 While my (much more concise; thanks D!) attempt at implementing 
 it is:

 forest_t[] meal(in forest_t[] forests) {
   forest_t[3] possible_meals = [{-1, -1, +1}, {-1, +1, -1}, 
 {+1, -1, -1}];
   return map!(a => [possible_meals[0]+a, possible_meals[1]+a, 
 possible_meals[2]+a])(forests)
     .join
     .filter!(a => !forest_invalid(a)).array
     .sort.uniq.array;
 }
Try forests.map!(a => only(possible_meals[0]+a, possible_meals[1]+a,
 possible_meals[2]+a))
Also, avoid the use of possible_meals, instead, use the constants directly.
     .filter!(a => !forest_invalid(a)).array
This may ends up doing many allocations. you could use an array where you preallocate forests.length * 3 items via the reserve function and use it as an output range.
     .sort.uniq.array;
Same here. With minor changes, I think you can get major improvement already.
Jun 07 2014
parent reply "logicchains" <jonathan.t.barnard gmail.com> writes:
On Saturday, 7 June 2014 at 08:27:52 UTC, deadalnix wrote:
 This may ends up doing many allocations. you could use an array 
 where you preallocate forests.length * 3 items via the reserve 
 function and use it as an output range.
I've changed filter to partition, which speeds it up slightly, but I'm not exactly sure how to use a pre-allocated array as an output range. I've tried: forest_t[] newForests; newForests.reserve(forests.length*3); map!(a => [forest_t(-1, -1, +1)+a, forest_t(-1, +1, -1)+a, forest_t(+1, -1, -1)+a])(forests) .join .partition!(forest_invalid) .sort.uniq.copy(newForests); But this leaves newForests with length zero, even though indexing it via for instance newForests[0] gives a correct forest.
 Try
 forests.map!(a => only(possible_meals[0]+a, possible_meals[1]+a,
 possible_meals[2]+a))
'only' doesn't seem to be working on my version of ldc (latest Arch Linux community build); even a simple `auto tst = only("1", "2", "3");` gives a compile error, even though the same thing works fine on dmd. The best I can get is: return map!(a => [forest_t(-1, -1, +1)+a, forest_t(-1, +1, -1)+a, forest_t(+1, -1, -1)+a])(forests) .join .partition!(forest_invalid) .sort.uniq.array;
Jun 07 2014
parent reply Timon Gehr <timon.gehr gmx.ch> writes:
On 06/07/2014 03:04 PM, logicchains wrote:
 ...

   return map!(a => [forest_t(-1, -1, +1)+a, forest_t(-1, +1, -1)+a,
 forest_t(+1, -1, -1)+a])(forests)
    .join
    .partition!(forest_invalid)
    .sort.uniq.array;
What about (untested)?: static forest_t[] possible_meals = [{-1, -1, +1}, {-1, +1, -1}, {+1, -1, -1}]; return forests.map!(a=>possible_meals.map!(b=>b+a)) .join.partition!forest_invalid.sort.uniq.array; (Of course, this still allocates two arrays.)
Jun 07 2014
parent reply "logicchains" <jonathan.t.barnard gmail.com> writes:
On Saturday, 7 June 2014 at 13:25:07 UTC, Timon Gehr wrote:
 What about (untested)?:

 static forest_t[] possible_meals = [{-1, -1, +1}, {-1, +1, -1}, 
 {+1, -1, -1}];
 return forests.map!(a=>possible_meals.map!(b=>b+a))
     .join.partition!forest_invalid.sort.uniq.array;

 (Of course, this still allocates two arrays.)
That seems around 5% slower, for some reason.
Jun 07 2014
parent reply "monarch_dodra" <monarchdodra gmail.com> writes:
On Saturday, 7 June 2014 at 13:31:21 UTC, logicchains wrote:
 On Saturday, 7 June 2014 at 13:25:07 UTC, Timon Gehr wrote:
 What about (untested)?:

 static forest_t[] possible_meals = [{-1, -1, +1}, {-1, +1, 
 -1}, {+1, -1, -1}];
 return forests.map!(a=>possible_meals.map!(b=>b+a))
    .join.partition!forest_invalid.sort.uniq.array;

 (Of course, this still allocates two arrays.)
That seems around 5% slower, for some reason.
Testing with dmd head, with input "300 300 300" (for what it's worth). I was able to identify two "choke points". The first, is building the initial forest. A straight up allocation + loop gave me about a 25% improvement (10s => 8S) forest_t[] arr = uninitializedArray!(forest_t[])(forests.length * 3); size_t j = size_t.max; foreach ( ref const a; forests ) { arr[++j] = forest_t(-1, -1, +1) + a; arr[++j] = forest_t(-1, +1, -1) + a; arr[++j] = forest_t(+1, -1, -1) + a; } Then, you are using the code: .partition!(forest_invalid).sort.uniq.array; The "advantage" of "partition" over "filter" is that "partition" is a greedy inplace algorithm. But then, you go on to use "uniq", which is lazy, and requires "array". I implemented a (very) simplistic in-place "removeDuplicates": T[] removeDuplicates(T)(T[] r) { if (r.empty) return r; size_t i = 0, j = 1; for ( ; j < r.length ; ++j) { if (r[i] != r[j]) r[++i] = r[j]; } return r[0 .. ++i]; } => .partition!(forest_invalid).sort.removeDuplicates; This gave me a some speed up (but nothing huge). Tweaking removeDuplicates to detect "runs at once" could prove to squeeze out more juice but ... ... instead of keeping "true" to your code, there is a faster way than "sort" to remove duplicates. Hashing. The code becomes: bool[forest_t] hash; foreach (tree ; arr.partition!forest_invalid) hash[tree] = true; return hash.keys; This give me about a 90% speedup (8s => 4.5s). Note though that this is not in-place at all. Also, I don't know if having the ouput actually sorted was important for you, or if it was only an implementation detail. In any case, if you want sorted, then you can: return hash.keys.sort(); But at that point, there has to be *some* duplication to justify using the hash table. The downside to all of this, is that the solution is now not-so-functional: forest_t[] meal(forest_t[] forests) { forest_t[] arr = uninitializedArray!(forest_t[])(forests.length * 3); size_t j = size_t.max; foreach ( ref const a; forests ) { arr[++j] = forest_t(-1, -1, +1) + a; arr[++j] = forest_t(-1, +1, -1) + a; arr[++j] = forest_t(+1, -1, -1) + a; } bool[forest_t] hash; foreach (tree ; arr.partition!forest_invalid) hash[tree] = true; return hash.keys; }
Jun 07 2014
next sibling parent reply "logicchains" <jonathan.t.barnard gmail.com> writes:
On Saturday, 7 June 2014 at 15:39:54 UTC, monarch_dodra wrote:
 ... instead of keeping "true" to your code, there is a faster 
 way than "sort" to remove duplicates. Hashing. The code becomes:

     bool[forest_t] hash;
     foreach (tree ; arr.partition!forest_invalid)
         hash[tree] = true;
     return hash.keys;

 This give me about a 90% speedup (8s => 4.5s). Note though that 
 this is not in-place at all. Also, I don't know if having the 
 ouput actually sorted was important for you, or if it was only 
 an implementation detail. In any case, if you want sorted, then 
 you can:
That's excellent, thank you. I didn't realise using a hash table could be so quick. The only reason I sorted it originally was because I was assuming that 'uniq' was like std::unique in that it only removes consecutive duplicates, and so requires the list to be sorted to remove all. It's still slightly less than half the speed of the C++ version, but I'm out of ideas now. I thought the C++ version might be vectorised but I checked the objdump and it wasn't, and I made an attempt at vectorising the D version, via inline SSE, but that didn't turn out to be significantly faster. I considered putting 'meal' straight into 'find_stable_forests', like: auto find_stable_forests(in forest_t forest){ forest_t[] forests = [forest]; return forests .recurrence!((a,n) => a[n-1].map!(a => [forest_t(-1, -1, +1)+a, forest_t(-1, +1, -1)+a, forest_t(+1, -1, -1)+a]) .join .partition!(forest_invalid) .uniq) .until!(a => !devouring_possible(a)); But I couldn't get the types to match up. If anyone's interested, my attempt at vectorising it was as follows. Interestingly, I found that taking the address of potentialMeals didn't work if it was made immutable or static, which is a pity as it seems unnecessary to allocate it every iteration. auto forestsAddr = forests.ptr; size_t forLen = forests.length; scope forest_t[] newFors = uninitializedArray!(forest_t[])(forLen*3); auto newForsAddr = newFors.ptr; size_t bytesToSimd = (forLen)*3*4; int[12] potentialMeals = [-1, -1, 1, 0, -1, 1, -1, 0, 1, -1, -1, 0]; asm{ movupd XMM0, [potentialMeals]; movupd XMM1, [potentialMeals+16]; movupd XMM2, [potentialMeals+32]; mov R8, forestsAddr; mov R9, forestsAddr; add R9, bytesToSimd; mov RCX, newForsAddr; loop:; movupd XMM3, [R8]; movupd XMM4, [R8]; movupd XMM5, [R8]; paddd XMM3, XMM0; paddd XMM4, XMM1; paddd XMM5, XMM2; movupd [RCX], XMM3; movupd [RCX+12], XMM4; movupd [RCX+24], XMM5; add RCX, 36; add R8, 12; cmp R8, R9; jne loop; }
Jun 08 2014
parent reply Joseph Rushton Wakeling via Digitalmars-d <digitalmars-d puremagic.com> writes:
On 08/06/14 10:18, logicchains via Digitalmars-d wrote:
 It's still slightly less than half the speed of the C++ version, but I'm out of
 ideas now. I thought the C++ version might be vectorised but I checked the
 objdump and it wasn't, and I made an attempt at vectorising the D version, via
 inline SSE, but that didn't turn out to be significantly faster.
What compiler are you using? DMD will in general produce slower executables than LDC or GDC -- about half as fast sounds quite plausible. If your C++ code is compiled with gcc or clang, you're not doing a fair comparison. If you haven't already, try compiling with LDC or GDC, with full-on -O -release -inline -noboundscheck options, and see how that compares with the C++ code.
Jun 08 2014
parent "logicchains" <jonathan.t.barnard gmail.com> writes:
On Sunday, 8 June 2014 at 15:03:32 UTC, Joseph Rushton Wakeling 
via Digitalmars-d wrote:
 What compiler are you using?

 DMD will in general produce slower executables than LDC or GDC 
 -- about half as fast sounds quite plausible.  If your C++ code 
 is compiled with gcc or clang, you're not doing a fair 
 comparison.

 If you haven't already, try compiling with LDC or GDC, with 
 full-on -O -release -inline -noboundscheck options, and see how 
 that compares with the C++ code.
I was using ldc2, with -O5 -release -disable-boundscheck. I wrote a C translation of the C++ implementation that differs only in that it uses quicksort instead of whatever std::stable_sort uses, and it runs in around the same time as the D implementation for input 100 100 100 (around 90ms, compared to 65ms for C++), so I suspect the C++ sorting algorithm is just slightly faster for this use case. Interestingly, when I increase the input to 400 or 500, while the C version retains the same speed ratio to the C++ one, the D version becomes around 3-4 times slower. I suspect this might be due to garbage collection, as putting everything into a hashmap then removing it again as a method to find the unique values probably generates a lot of garbage. Although I'd have assumed it was only O(N), compared to O(NlogN) for the sort+uniq method that the C and C++ versions use, so in that sense I'm surprised that the D version doesn't become comparatively faster as input increases.
Jun 09 2014
prev sibling parent reply Nick Treleaven <ntrel-public yahoo.co.uk> writes:
On 07/06/2014 16:39, monarch_dodra wrote:
 Then, you are using the code:
 .partition!(forest_invalid).sort.uniq.array;

 The "advantage" of "partition" over "filter" is that "partition" is a
 greedy inplace algorithm. But then, you go on to use "uniq", which is
 lazy, and requires "array". I implemented a (very) simplistic in-place
 "removeDuplicates":
More generally, perhaps we could have a function to apply a lazy range to an existing random-access range: import std.range; /** Copy up to r.length elements of src to r. */ auto refill(R, Input)(ref R r, Input src) if (isRandomAccessRange!R && isInputRange!Input) { foreach (i, ref e; r) { if (src.empty) { r.length = i; return r; } e = src.front; src.popFront; } // ignore leftover elements in src return r; } import std.algorithm : uniq; /// eager wrapper for uniq auto dedup(T)(ref T v){return v.refill(v.uniq);} //alias dedup = (ref v) => v.refill(v.uniq); void main() { auto a = [5,5,5,5,4,3,3,3,1]; a.dedup; assert(a == [5, 4, 3, 1]); }
Jun 11 2014
parent Nick Treleaven <ntrel-public yahoo.co.uk> writes:
On 11/06/2014 14:45, Nick Treleaven wrote:
 /** Copy up to r.length elements of src to r. */
 auto refill(R, Input)(ref R r, Input src)
 if (isRandomAccessRange!R && isInputRange!Input)
Note this is related to: std.algorithm.fill(Range1 range, Range2 filler) Except that fill always fills all of range cyclically: I've removed 'ref' for r's parameter for consistency with std.algorithm's policy to only change content, not topology. Also some minor edits: https://github.com/ntrel/stuff/blob/master/refill.d I think refill doesn't need to require a random access range now though, maybe just a forward range with slicing, like std.range.take.
Jun 11 2014