def mapper(dataIterator: Iterator[(Index, (LabeledObject[X, Y], PrimalInfo, Option[BoundedCacheList[Y]]))],
localModel: StructSVMModel[X, Y],
featureFn: (Y, X) => Vector[Double], // (y, x) => FeatureVect,
lossFn: (Y, Y) => Double, // (yTruth, yPredict) => LossVal,
oracleFn: (StructSVMModel[X, Y], Y, X) => Y, // (model, y_i, x_i) => Lab,
predictFn: (StructSVMModel[X, Y], X) => Y,
solverOptions: SolverOptions[X, Y],
averagedPrimalInfo: PrimalInfo,
iterCount: Int,
miniBatchEnabled: Boolean): Iterator[(StructSVMModel[X, Y], Array[(Index, (PrimalInfo, Option[BoundedCacheList[Y]]))], StructSVMModel[X, Y], PrimalInfo, Int // weighted average model
)]
def reducer( // sc: SparkContext,
zippedModels: RDD[(StructSVMModel[X, Y], Array[(Index, (PrimalInfo, Option[BoundedCacheList[Y]]))], StructSVMModel[X, Y], PrimalInfo, Int)], // The optimize step returns k blocks. Each block contains (\Delta LocalModel, [\Delta PrimalInfo_i]).
oldPrimalInfo: RDD[(Index, PrimalInfo)],
oldCache: RDD[(Index, BoundedCacheList[Y])],
oldGlobalModel: StructSVMModel[X, Y],
oldWeightedAveragePrimals: PrimalInfo,
d: Int,
beta: Double): (StructSVMModel[X, Y], RDD[(Index, PrimalInfo)], RDD[(Index, BoundedCacheList[Y])], PrimalInfo, Int)
Consider refactoring by using an intermediate case class for transferring intermediate data in between communication rounds.