From 8a653a30f67e7eda18aa83e292d65d38c29fea9d Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Fri, 17 May 2019 15:33:00 -0700 Subject: [PATCH] Type independent gens for TypedPipe/Execution --- .../ExecutionOptimizationRulesTest.scala | 20 +- .../scalding/typed/gen/ExecutionGen.scala | 61 +++++ .../twitter/scalding/typed/gen/StdGen.scala | 25 ++ .../scalding/typed/gen/StdSemigroup.scala | 17 ++ .../twitter/scalding/typed/gen/TypeGen.scala | 59 ++++ .../twitter/scalding/typed/gen/TypeWith.scala | 16 ++ .../scalding/typed/gen/TypedPipeGen.scala | 259 ++++++++++++++++++ 7 files changed, 453 insertions(+), 4 deletions(-) create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/gen/ExecutionGen.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdGen.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdSemigroup.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeGen.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeWith.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypedPipeGen.scala diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionOptimizationRulesTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionOptimizationRulesTest.scala index 9fcda42a91..0659b4b7d2 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionOptimizationRulesTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionOptimizationRulesTest.scala @@ -8,6 +8,8 @@ import cascading.tuple.{Fields, Tuple} import com.stripe.dagon.{Dag, Rule} import com.twitter.maple.tap.MemorySourceTap import com.twitter.scalding.typed.TypedPipeGen +import com.twitter.scalding.typed.gen +import com.twitter.scalding.typed.gen.{ExecutionGen, TypeGen, TypeWith} import java.io.{InputStream, OutputStream} import java.util.UUID import org.scalacheck.{Arbitrary, Gen} @@ -98,9 +100,19 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks { def write(pipe: Gen[TypedPipe[Int]]): Gen[Execution[TypedPipe[Int]]] = pipe.map(_.writeThrough(new MemorySource[Int]())) + def write(t: TypeWith[TypeGen])(pipe: Gen[TypedPipe[_]]): Gen[Execution[TypedPipe[_]]] = { + implicit val tc = t.evidence.tupleConverter + pipe.map(_.writeThrough(new MemorySource())) + } + val mappedOrFlatMapped = Gen.oneOf(mapped(pipe), flatMapped(pipe)) + val stdZippedWrites = + Gen.zip(gen.TypedPipeGen.pipeOf, gen.TypedPipeGen.pipeOf).flatMap { case ((l, tl), (r, tr)) => + zipped(write(tl)(l), write(tr)(r)) + } + val zippedWrites = zipped(write(TypedPipeGen.genWithIterableSources), write(TypedPipeGen.genWithIterableSources)) @@ -175,7 +187,7 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks { } test("randomly generated executions trees are invertible") { - forAll(genExec) { exec => + forAll(ExecutionGen.executionOf) { case (exec, _) => invert(exec) } } @@ -183,7 +195,7 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks { test("optimization rules are reproducible") { implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 500) - forAll(genExec, genRule) { (exec, rule) => + forAll(ExecutionGen.executionOf, genRule) { case ((exec, _), rule) => val optimized = ExecutionOptimizationRules.apply(exec, rule) val optimized2 = ExecutionOptimizationRules.apply(exec, rule) assert(optimized == optimized2) @@ -193,7 +205,7 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks { test("standard rules are reproducible") { implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 500) - forAll(genExec) { exec => + forAll(ExecutionGen.executionOf) { case (exec, _) => val optimized = ExecutionOptimizationRules.stdOptimizations(exec) val optimized2 = ExecutionOptimizationRules.stdOptimizations(exec) assert(optimized == optimized2) @@ -217,7 +229,7 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks { } test("zip of writes merged") { - forAll(zippedWrites) { e => + forAll(stdZippedWrites) { e => val opt = ExecutionOptimizationRules.apply(e, ZipWrite) assert(e.isInstanceOf[Execution.Zipped[_, _]]) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/ExecutionGen.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/ExecutionGen.scala new file mode 100644 index 0000000000..866ae7fd09 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/ExecutionGen.scala @@ -0,0 +1,61 @@ +package com.twitter.scalding.typed.gen + +import com.twitter.scalding.Execution +import com.twitter.scalding.typed.TypedPipe +import org.scalacheck.{Cogen, Gen} + +object ExecutionGen { + import TypedPipeGen._ + + private[this] def cogen(t: TypeWith[TypeGen]): Cogen[TypedPipe[t.Type]] = + Cogen[TypedPipe[t.Type]] { pipe: TypedPipe[t.Type] => + pipe.hashCode.toLong + } + + def executionOf(implicit tg: Gen[TypeWith[TypeGen]]): Gen[(Execution[TypedPipe[_]], TypeWith[TypeGen])] = + tg.flatMap { t => + executionOf(t).map(_ -> t) + } + + def executionOf(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] = + Gen.delay( + Gen.frequency( + 5 -> genFrom(a), + 1 -> genMap(a), + 1 -> genFlatMap(a), + 1 -> tg.flatMap { t => + Gen.oneOf( + genZipped(a, t).map(_.map(_._1)), + genZipped(t, a).map(_.map(_._2)) + ) + } + ) + ) + + def genFrom(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] = + pipeOf(a).map(Execution.from(_)) + + def genMap(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] = + tg.flatMap { t => + executionOf(t).flatMap { exec => + Gen.function1(pipeOf(a))(cogen(t)).map { f => + exec.map(f) + } + } + } + + def genFlatMap(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] = + tg.flatMap { t => + executionOf(t).flatMap { exec => + Gen.function1(executionOf(a))(cogen(t)).map { f => + exec.flatMap(f) + } + } + } + + def genZipped(l: TypeWith[TypeGen], r: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[(TypedPipe[l.Type], TypedPipe[r.Type])]] = + for { + le <- executionOf(l) + re <- executionOf(r) + } yield le.zip(re) +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdGen.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdGen.scala new file mode 100644 index 0000000000..eb8ca8ab29 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdGen.scala @@ -0,0 +1,25 @@ +package com.twitter.scalding.typed.gen + +import org.scalacheck.Gen + +object StdGen { + implicit val stringGen: Gen[String] = Gen.alphaStr + + implicit val charGen: Gen[Char] = Gen.alphaChar + + implicit val booleanGen: Gen[Boolean] = Gen.oneOf(true, false) + + implicit val unitGen: Gen[Unit] = Gen.const(()) + + implicit val byteGen: Gen[Byte] = Gen.chooseNum(Byte.MinValue, Byte.MaxValue) + + implicit val shortGen: Gen[Short] = Gen.chooseNum(Short.MinValue, Short.MaxValue) + + implicit val intGen: Gen[Int] = Gen.chooseNum(Int.MinValue, Int.MaxValue) + + implicit val longGen: Gen[Long] = Gen.chooseNum(Long.MinValue, Long.MaxValue) + + implicit val floatGen: Gen[Float] = Gen.chooseNum(Float.MinValue, Float.MaxValue) + + implicit val doubleGen: Gen[Double] = Gen.chooseNum(Double.MinValue, Double.MaxValue) +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdSemigroup.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdSemigroup.scala new file mode 100644 index 0000000000..4c6e318185 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/StdSemigroup.scala @@ -0,0 +1,17 @@ +package com.twitter.scalding.typed.gen + +import com.twitter.algebird.Semigroup + +object StdSemigroup { + implicit val byteGroup: Semigroup[Byte] = Semigroup.from { case (l, r) => + implicitly[Numeric[Byte]].plus(l, r) + } + + implicit val charGroup: Semigroup[Char] = Semigroup.from { case (l, r) => + implicitly[Numeric[Char]].plus(l, r) + } + + implicit val stringGroup: Semigroup[String] = Semigroup.from { case (l, r) => + l + r + } +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeGen.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeGen.scala new file mode 100644 index 0000000000..6ea9bbbef3 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeGen.scala @@ -0,0 +1,59 @@ +package com.twitter.scalding.typed.gen + + +import com.twitter.algebird.Semigroup +import com.twitter.scalding.TupleConverter +import org.scalacheck.{Cogen, Gen} + +trait TypeGen[A] { + val gen: Gen[A] + val cogen: Cogen[A] + val ordering: Ordering[A] + val semigroup: Semigroup[A] + val tupleConverter: TupleConverter[A] +} + +object TypeGen { + import StdGen._ + import StdSemigroup._ + + def apply[A](g: Gen[A], c: Cogen[A], o: Ordering[A], s: Semigroup[A], t: TupleConverter[A]): TypeGen[A] = + new TypeGen[A] { + val gen: Gen[A] = g + val cogen: Cogen[A] = c + val ordering: Ordering[A] = o + val semigroup: Semigroup[A] = s + val tupleConverter: TupleConverter[A] = t + } + + def apply[A, B](a: TypeGen[A], b: TypeGen[B]): TypeGen[(A, B)] = + new TypeGen[(A, B)] { + val gen: Gen[(A, B)] = Gen.zip(a.gen, b.gen) + val cogen: Cogen[(A, B)] = Cogen.tuple2(a.cogen, b.cogen) + val ordering: Ordering[(A, B)] = Ordering.Tuple2(a.ordering, b.ordering) + val semigroup: Semigroup[(A, B)] = Semigroup.semigroup2(a.semigroup, b.semigroup) + val tupleConverter: TupleConverter[(A, B)] = + TupleConverter.build(a.tupleConverter.arity + b.tupleConverter.arity) { te => + val ta = a.tupleConverter.apply(te) + val tb = b.tupleConverter.apply(te) + (ta, tb) + } + } + + implicit def typeGen[A: Gen: Cogen: Ordering: Semigroup: TupleConverter]: TypeGen[A] = + TypeGen(implicitly, implicitly, implicitly, implicitly, implicitly) + + implicit val std: Gen[TypeWith[TypeGen]] = + Gen.oneOf( + TypeWith[Unit, TypeGen], + TypeWith[Boolean, TypeGen], + TypeWith[Byte, TypeGen], + TypeWith[Char, TypeGen], + TypeWith[Short, TypeGen], + TypeWith[Int, TypeGen], + TypeWith[Long, TypeGen], + TypeWith[Float, TypeGen], + TypeWith[Double, TypeGen], + TypeWith[String, TypeGen] + ) +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeWith.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeWith.scala new file mode 100644 index 0000000000..7db7bdf3b3 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypeWith.scala @@ -0,0 +1,16 @@ +package com.twitter.scalding.typed.gen + +sealed abstract class TypeWith[+Ev[_]] { + type Type + def evidence: Ev[Type] +} + +object TypeWith { + type Aux[A, Ev[_]] = TypeWith[Ev] { type Type = A } + + def apply[A, Ev[_]](implicit eva: Ev[A]): Aux[A, Ev] = + new TypeWith[Ev] { + type Type = A + def evidence: Ev[Type] = eva + } +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypedPipeGen.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypedPipeGen.scala new file mode 100644 index 0000000000..19dab5b6a8 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/gen/TypedPipeGen.scala @@ -0,0 +1,259 @@ +package com.twitter.scalding.typed.gen + +import com.twitter.scalding.TypedPipe +import org.scalacheck.{Cogen, Gen} + +object TypedPipeGen { + def pipeOf(implicit tg: Gen[TypeWith[TypeGen]]): Gen[(TypedPipe[_], TypeWith[TypeGen])] = + tg.flatMap { t => + pipeOf(t).map(_ -> t) + } + + def pipeOf(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + Gen.delay( + Gen.frequency( + 5 -> genFrom(a), + 1 -> genFork(a), + 1 -> genForceToDisk(a), + 1 -> genDistinct(a), + 1 -> genFilter(a), + 1 -> genCollect(a), + 1 -> genMap(a), + 1 -> genFlatMap(a), + 1 -> genMerge(a), + 1 -> tg.flatMap { t => + Gen.oneOf( + pipeOf(a, t).map(_.keys), + pipeOf(t, a).map(_.values) + ) + }, + 1 -> Gen.zip(tg, tg).flatMap { case (t1, t2) => + Gen.oneOf( + pipeOf(a, t1, t2).map(_.keys), + pipeOf (t1, a, t2).map(_.values.keys), + pipeOf (t1, t2, a).map(_.values.values) + ) + } + ) + ) + + def pipeOf(k: TypeWith[TypeGen], v: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + Gen.delay( + Gen.oneOf( + genGrouped(k, v), + genSortedGrouped(k, v), + genWithReducers(k, v), + genMapGroup(k, v), + genFilterKeys(k, v), + genMapValues(k, v), + genFlatMapValues(k, v), + genSumByKey(k, v), + genSumByLocalKeys(k, v), + genCross(k, v) + ) + ) + + def pipeOf( + k: TypeWith[TypeGen], + v1: TypeWith[TypeGen], + v2: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, (v1.Type, v2.Type))]] = + Gen.delay( + Gen.oneOf( + genJoin(k, v1, v2), + genHashJoin(k, v1, v2) + ) + ) + + def genFrom(a: TypeWith[TypeGen]): Gen[TypedPipe[a.Type]] = + for { + lst <- Gen.listOf(a.evidence.gen) + } yield TypedPipe.from(lst) + + def genFork(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + pipeOf(a).map(_.fork) + + def genForceToDisk(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + pipeOf(a).map(_.forceToDisk) + + def genDescription(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + Gen.identifier.flatMap { desc => + pipeOf(a).map(_.withDescription(desc)) + } + + def genMap(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + tg.flatMap { b => + pipeOf(b).flatMap { pipe => + Gen.function1(a.evidence.gen)(b.evidence.cogen).map { f => + pipe.map(f) + } + } + } + + def genFilter(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + pipeOf(a).flatMap { pipe => + Gen.function1(StdGen.booleanGen)(a.evidence.cogen).map { f => + pipe.filter(f) + } + } + + def genCollect(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + tg.flatMap { b => + pipeOf(b).flatMap { pipe => + Gen.zip( + Gen.function1(StdGen.booleanGen)(b.evidence.cogen), + Gen.function1(a.evidence.gen)(b.evidence.cogen) + ).flatMap { case (p, f) => + val pf = new PartialFunction[b.Type, a.Type] { + override def isDefinedAt(x: b.Type): Boolean = p.apply(x) + + override def apply(v1: b.Type): a.Type = f.apply(v1) + } + + pipe.collect(pf) + } + } + } + + def genFlatMap(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + tg.flatMap { b => + pipeOf(b).flatMap { pipe => + Gen.function1(Gen.listOf(a.evidence.gen))(b.evidence.cogen).map { f => + pipe.flatMap(f) + } + } + } + + def genMerge(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + for { + left <- pipeOf(a) + right <- pipeOf(a) + } yield left ++ right + + def genDistinct(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[a.Type]] = + pipeOf(a).map { pipe => + pipe.distinct(a.evidence.ordering) + } + + def genGrouped( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + pipeOf( + TypeWith(TypeGen(k.evidence, v.evidence)) + ).map { pipe => + pipe.group(k.evidence.ordering) + } + + def genSortedGrouped( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + pipeOf(k, v).map { pipe => + pipe.group(k.evidence.ordering).sorted(v.evidence.ordering) + } + + def genWithReducers( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + pipeOf(k, v).flatMap { pipe => + StdGen.intGen.map { reducers => + pipe.group(k.evidence.ordering).withReducers(reducers) + } + } + + def genMapGroup( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + tg.flatMap { t => + def cogenIter[A: Cogen]: Cogen[Iterator[A]] = + Cogen.it(_.toIterator) + + pipeOf(k, t).flatMap { pipe => + Gen.function1( + Gen.listOf(v.evidence.gen).map(_.toIterator) + )( + Cogen.tuple2(k.evidence.cogen, cogenIter(t.evidence.cogen)) + ).map { f => + pipe.group(k.evidence.ordering).mapGroup(Function.untupled(f)).toTypedPipe + } + } + } + + def genMapValues( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + tg.flatMap { t => + pipeOf(k, t).flatMap { pipe => + Gen.function1(v.evidence.gen)(t.evidence.cogen).map { f => + pipe.mapValues(f) + } + } + } + + def genFlatMapValues( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + tg.flatMap { t => + pipeOf(k, t).flatMap { pipe => + Gen.function1(Gen.listOf(v.evidence.gen))(t.evidence.cogen).map { f => + pipe.flatMapValues(f) + } + } + } + + def genFilterKeys( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + pipeOf(k, v).flatMap { pipe => + Gen.function1(StdGen.booleanGen)(k.evidence.cogen).map { f => + pipe.filterKeys(f) + } + } + + def genSumByKey( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + pipeOf(k, v).flatMap { pipe => + pipe.sumByKey(k.evidence.ordering, v.evidence.semigroup).toTypedPipe + } + + def genSumByLocalKeys( + k: TypeWith[TypeGen], + v: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, v.Type)]] = + pipeOf(k, v).flatMap { pipe => + pipe.sumByLocalKeys(v.evidence.semigroup) + } + + def genJoin( + k: TypeWith[TypeGen], + v1: TypeWith[TypeGen], + v2: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, (v1.Type, v2.Type))]] = + for { + g1 <- pipeOf(k, v1) + g2 <- pipeOf(k, v2) + } yield g1.group(k.evidence.ordering) join g2.group(k.evidence.ordering) + + def genHashJoin( + k: TypeWith[TypeGen], + v1: TypeWith[TypeGen], + v2: TypeWith[TypeGen] + )(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(k.Type, (v1.Type, v2.Type))]] = + for { + g1 <- pipeOf(k, v1) + g2 <- pipeOf(k, v2) + } yield g1.group(k.evidence.ordering) hashJoin g2.group(k.evidence.ordering) + + def genCross(a: TypeWith[TypeGen], b: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[TypedPipe[(a.Type, b.Type)]] = + Gen.zip(pipeOf(a), pipeOf(b)).map { case (p1, p2) => + p1.cross(p2) + } +}