Rectangle 27 42

RDD.union
val rddPart1 = ???
val rddPart2 = ???
val rddAll = rddPart1.union(rddPart2)
val rdd1 = sc.parallelize(Seq((1, "Aug", 30),(1, "Sep", 31),(2, "Aug", 15),(2, "Sep", 10)))
val rdd2 = sc.parallelize(Seq((1, "Oct", 10),(1, "Nov", 12),(2, "Oct", 5),(2, "Nov", 15)))
rdd1.union(rdd2).collect

res0: Array[(Int, String, Int)] = Array((1,Aug,30), (1,Sep,31), (2,Aug,15), (2,Sep,10), (1,Oct,10), (1,Nov,12), (2,Oct,5), (2,Nov,15))

rddPart1.union(rddPart2) will add columns of rddPart2 to rddPart1. I need to add rows of rddPart2 to rddPart1. FYI, both the RDDs in this case have the same column names and types

While the example makes it look like concatenation takes place (rdd1 is followed by rdd2 in the output), I don't believe union makes any guarantees about ordering of the data. They could get mixed up with each other. Real concatenation is not so easy, because it implies an order dependency in your data, which is fighting against distributed-ness of spark.

Concatenating datasets of different RDDs in Apache spark using scala -...

scala apache-spark apache-spark-sql distributed-computing rdd
Rectangle 27 3

The Scala SDK is not binary compatible between major releases (for example, 2.10 and 2.11). If you have Scala code that you will be using with Spark and that code is compiled against a particular major version of Scala (say 2.10) then you will need to use the compatible version of Spark. For example, if you are writing Spark 1.4.1 code in Scala and you are using the 2.11.4 compiler, then you should use Spark 1.4.1_2.11.

If you are not using Scala code then there should be no functional difference between Spark 1.4.1_2.10 and Spark 1.4.1_2.11 (if there is, it is most likely a bug). The only difference should be the version of the Scala compiler used to compile Spark and the corresponding libraries.

Shouldn't 2.10 and 2.11 be minor releases? Usually the first number is the major and the second number is the minor release.

Not for Scala. For example, 2.10.1 and 2.10.2 are two different minor releases that are binary compatible.

Why do apache spark artifact names include scala versions - Stack Over...

apache-spark
Rectangle 27 4

dateFormat
val df = sqlContext.read
  .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
  .option("header","true")
  .option("delimiter",";")
  .option("dateFormat", "dd.MM.yyyy")
  .schema(customSchema)
  .load("data.csv")

Thanks, that worked. Can you tell me if there is a list of available options for the DataFrameReader? I could not find one yet.

sql - (Scala) Convert String to Date in Apache Spark - Stack Overflow

sql scala csv apache-spark
Rectangle 27 2

It's not feasible to run Spark Kafka against Scala 2.11 by now (Spark-1.3)

If no pre-build version available, you could build spark yourself and fulfil your needs by specifying some build parameters.

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

mvn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
Building for Scala 2.11

To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 property:

dev/change-version-to-2.11.sh
mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package

Specifically, Sparks external Kafka library and JDBC component are not yet supported in Scala 2.11 builds.

And it is no longer experimental. After building Spark from source with 2.11, it worked fine for me.

How to run Kafka as a stream for Apache Spark using Scala 2.11? - Stac...

scala apache-spark apache-kafka
Rectangle 27 2

We haven't used Spark 2.0 in production yet with Scala 2.11 and notebooks. The root cause you your error is in compatibility. Based on GitHub Toree description, the latest Scala version that is supported is Scala 2.10.4 and you have 2.11.8. Try to downgrade it to 2.10 if it is not a production need to use only 2.11

is there any link as to how to build spark. I have the docs here, but I find it quite complicated. spark.apache.org/docs/latest/ Appreciate the help. Thanks

Oh, that's easy. Actually you have 2 ways to do what you need: 1. Compile Spark with Scala 2.10 compatibility 2. Get latest master branch of Three from github.com/apache/incubator-toree and compile it in your local environment. They have Scala 2.11 and Spark 2.0 support in that non-released version.

thank you very much for the help. One last thing, I got the latest branch from the link. but how do I compile it? I tried but been having quite a number of errors. here's my SO question link. stackoverflow.com/questions/40732177/

I did compiling on my Mac and it went well without any issues like you described. I can check it out on ubuntu box in a while

Apache Toree and Spark Scala Not Working in Jupyter - Stack Overflow

scala apache-spark jupyter-notebook apache-toree
Rectangle 27 2

We haven't used Spark 2.0 in production yet with Scala 2.11 and notebooks. The root cause you your error is in compatibility. Based on GitHub Toree description, the latest Scala version that is supported is Scala 2.10.4 and you have 2.11.8. Try to downgrade it to 2.10 if it is not a production need to use only 2.11

is there any link as to how to build spark. I have the docs here, but I find it quite complicated. spark.apache.org/docs/latest/ Appreciate the help. Thanks

Oh, that's easy. Actually you have 2 ways to do what you need: 1. Compile Spark with Scala 2.10 compatibility 2. Get latest master branch of Three from github.com/apache/incubator-toree and compile it in your local environment. They have Scala 2.11 and Spark 2.0 support in that non-released version.

thank you very much for the help. One last thing, I got the latest branch from the link. but how do I compile it? I tried but been having quite a number of errors. here's my SO question link. stackoverflow.com/questions/40732177/

I did compiling on my Mac and it went well without any issues like you described. I can check it out on ubuntu box in a while

Apache Toree and Spark Scala Not Working in Jupyter - Stack Overflow

scala apache-spark jupyter-notebook apache-toree
Rectangle 27 1

Assuming data frame is loaded with headers and structure is flat:

val df = sqlContext.
    read.
    format("com.databricks.spark.csv").
    option("header", "true").
    load("data.csv")
import org.apache.spark.sql.DataFrame

def moreThan9(df: DataFrame, col: String) = {
    df.agg(countDistinct(col)).first()(0) match {
        case x: Long => x > 9L
        case _ => false
    }
}

val newDf = df.
    schema. //  Extract schema
    toArray. // Convert to array
    map(_.name). // Map to names
    foldLeft(df)((df: DataFrame, col: String) => {
        if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
    })

If it is loaded without header then you can do the same thing using mapping from automatically the assigned ones to the actual.

Why foldLeft and not filter?

@dskrvk Do you mean something like .filter(...).foreach(col => df = df.drop(col))? Simply a matter of referential transparency and a personal taste I guess.

Data preprocessing with apache spark and scala - Stack Overflow

scala apache-spark rdd
Rectangle 27 2

I had the same problem. To combine by row instead of column use unionAll:

val rddPart1= ???
val rddPart2= ???
val rddAll = rddPart1.unionAll(rddPart2)

Concatenating datasets of different RDDs in Apache spark using scala -...

scala apache-spark apache-spark-sql distributed-computing rdd
Rectangle 27 1

def createArray(values: List[String]) : Vector =
    {                
          var arr : Array[Double] = new Array[Double](tags_table.size)
          tags_table.foreach(x => arr(x._2) =  if (values.contains(x._1)) 1.0 else 0.0 )
          val dv: Vector = Vectors.dense(arr)
          return dv

    }
    val data_tmp=result.map(x=> createArray(x._2))       
    val parsedData = data_tmp.map { line => LabeledPoint(1.0,line) }

Convert RDD of Vector in LabeledPoint using Scala - MLLib in Apache Sp...

scala apache-spark label apache-spark-mllib
Rectangle 27 109

In Spark >= 1.6 it is possible to use partitioning by column for query and caching. See: SPARK-11410 and SPARK-4849 using repartition method:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Unlike RDDs Spark Dataset (including Dataset[Row] a.k.a DataFrame) cannot use custom partitioner as for now. You can typically address that by creating an artificial partitioning column but it won't give you the same flexibility.

One thing you can do is to pre-partition input data before you create a DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Since DataFrame creation from an RDD requires only a simple map phase existing partition layout should be preserved*:

assert(df.rdd.partitions == partitioned.partitions)

The same way you can repartition existing DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

So it looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn't:

Repartitioning is an expensive process. In a typical scenario most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.

  • joins in some scenarios, but it would require an internal support,
  • window functions calls with matching partitioner. Same as above, limited to a single window definition. It is already partitioned internally though, so pre-partitioning may be redundant,
  • simple aggregations with GROUP BY - it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent to groupByKey.mapValues(_.reduce) (current behavior) vs reduceByKey (pre-partitioning). Unlikely to be useful in practice.
SqlContext.cacheTable
OrderedRDDFunctions.repartitionAndSortWithinPartitions

Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.

  • A whole point of using a high level declarative API is to isolate yourself from a low level implementation details. As already mentioned by @dwysakowicz and @RomiKuntsman an optimization is a job of the Catalyst Optimizer. It is a pretty sophisticated beast and I really doubt you can easily improve on that without diving much deeper into its internals.

Partitioning with JDBC sources

JDBC data sources support predicates argument. It can be used as follows:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

It creates a single JDBC partition per predicate. Keep in mind that if sets created using individual predicates are not disjoint you'll see duplicates in the resulting table.

partitionBy

Spark DataFrameWriter provides partitionBy method which can be used to "partition" data on write. It separates data on write using provided set of columns

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

but it is not equivalent to DataFrame.repartition. In particular aggregations like:

val cnts = df1.groupBy($"k").sum()
TungstenExchange
cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
DataFrameWriter

bucketBy has similar applications as partitionBy but it is available only for tables (saveAsTable). As of today (Spark 2.1.0) it doesn't look like there are any execution plan optimizations applied on bucketed tables.

* By partition layout I mean only a data distribution. partitioned RDD has no longer a partitioner. ** Assuming no early projection. If aggregation covers only small subset of columns there is probably no gain whatsoever.

@bychance Yes and no. Data layout will be preserved but AFAIK it won't give you benefits like partition pruning.

@zero323 Thanks, is there a way to check partition allocation of parquet file to validate df.save.write indeed save the layout? And if I do df.repartition("A"), then do df.write.repartitionBy("B"), the physical folder structure will be partitioned by B, and within each B value folder, will it still keep the partition by A?

@bychance DataFrameWriter.partitionBy is logically not the same as DataFrame.repartition. Former on doesn't shuffle, it simply separates the output. Regarding the first question.- data is saved per partition and there is no shuffle. You can easily check that by reading individual files. But Spark alone has no way to know about it if this is what you really want.

@zero323 I have a similar question. I am trying to optimize my code using dataframes and sql and I'm not sure how to optimize further. Do you have any suggestions for this

@zero323 What's the reason why DataSet/DataFrame can not use custom partitioner?

Sign up for our newsletter and get our top new questions delivered to your inbox (see an example).

scala - How to define partitioning of DataFrame? - Stack Overflow

scala apache-spark dataframe apache-spark-sql
Rectangle 27 19

There's a built-in collectAsMap function in PairRDDFunctions that would deliver you a map of the pair values in the RDD.

val vertexMAp = vertices.zipWithUniqueId.collectAsMap

It's important to remember that an RDD is a distributed data structure. You can visualize it a 'pieces' of your data spread over the cluster. When you collect, you force all those pieces to go to the driver and to be able to do that, they need to fit in the memory of the driver.

From the comments, it looks like in your case, you need to deal with a large dataset. Making a Map out of it is not going to work as it won't fit on the driver's memory; causing OOM exceptions if you try.

You probably need to keep the dataset as an RDD. If you are creating a Map in order to lookup elements, you could use lookup on a PairRDD instead, like this:

import org.apache.spark.SparkContext._  // import implicits conversions to support PairRDDFunctions

val vertexMap = vertices.zipWithUniqueId
val vertixYId = vertexMap.lookup("vertexY")

if you have unique values in your left tuple already, do you need to use zipWithUniqueId?

@maasg does lookup works across the worker nodes in RDDs?

@santhosh yes. lookup works across the complete distributed RDD

apache spark - How to convert Scala RDD to Map - Stack Overflow

scala apache-spark
Rectangle 27 26

val rows: RDD[Row] = df.rdd
val rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = df.rdd

To extend Boern's answer, add the following two import commands: import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row

apache spark - How to convert DataFrame to RDD in Scala? - Stack Overf...

scala apache-spark apache-spark-sql spark-dataframe
Rectangle 27 16

You need to sort RDD and take element in the middle or average of two elements. Here is example with RDD[Int]:

import org.apache.spark.SparkContext._

  val rdd: RDD[Int] = ???

  val sorted = rdd.sortBy(identity).zipWithIndex().map {
    case (v, idx) => (idx, v)
  }

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
  } else sorted.lookup(count / 2).head.toDouble

what is this "lookup" method ? AFAIK it does not exist in RDD.

p.s. I think that there are faster algorithms for finding median that don't require full sorting (en.wikipedia.org/wiki/Selection_algorithm)

unfortunately they are not applicable to distributed RDD

Can DataFrame API be used instead of RDD API?

scala - How can I calculate exact median with Apache Spark? - Stack Ov...

scala hadoop bigdata apache-spark
Rectangle 27 7

Just paste into a spark-shell:

val a = 
  Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String, 
  callTime: String, duration: String, calltype: String, swId: String)

Then map() over the RDD to create instances of the case class, and then create the DataFrame using toDF():

scala> val df = rdd.map { 
  case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame = 
  [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string]

This infers the schema from the case class.

Then you can proceed with:

scala> df.printSchema()
root
 |-- callId: string (nullable = true)
 |-- oCallId: string (nullable = true)
 |-- callTime: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
|    callId|oCallId|           callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
+----------+-------+-------------------+--------+--------+----+

If you want to use toDF() in a normal program (not in the spark-shell), make sure (quoted from here):

import sqlContext.implicits._
SQLContext
  • Define the case class outside of the method using toDF()

awesome answer, I got everything needed from this. Thanks a lot

hadoop - Convert RDD to Dataframe in Spark/Scala - Stack Overflow

scala hadoop apache-spark
Rectangle 27 7

$() is a Spark function defined in the trait Params. It simply calls getOrDefault on the Params object.

/** An alias for [[getOrDefault()]]. */
protected final def $[T](param: Param[T]): T = getOrDefault(param)

Great to hear @PunyTitan. If you're satisfied with an answer then you can simply accept it as the answer to your question in order to show your gratitude :-)

apache spark - What does $( ) mean in Scala? - Stack Overflow

scala apache-spark
Rectangle 27 6

String
split
line.split("\\|")

otherwise it is interpreted as an alternation between two empty patterns.

line.split('|')
Array
Character
line.split(Array('|'))
names.map(_.split("\\|")).collect {
  case Array(x, _, y) => (x, y)
}

scala - Apache Spark RDD Split "|" - Stack Overflow

scala apache-spark
Rectangle 27 6

val rdd: RDD[String] = ???

val map: Map[String, Long] = rdd.zipWithUniqueId().collect().toMap
val map: Map[String, Long] = rdd.zipWithUniqueId().collect().toMap

No. You can use NoSql storage (Cassandra for example) to load your RDD and access it with Map-like interface.

you can use github.com/datastax/spark-cassandra-connector to save your RDD[(String, Long)] as cassandra table. And later use it as fast by-key-lookup

apache spark - How to convert Scala RDD to Map - Stack Overflow

scala apache-spark
Rectangle 27 6

You can use map function with pattern matching to do the job here

import org.apache.spark.sql.Row

dataFrame
  .map { case Row(name, age) => Map("name" -> name, "age" -> age) }
RDD[Map[String, Any]]

apache spark - Convert DataFrame to RDD[Map] in Scala - Stack Overflow

scala apache-spark
Rectangle 27 6

Driver program is responsible for creating SparkContext, SQLContext and scheduling tasks on the worker nodes. It includes creating logical and physical plans and applying optimizations. To be able to do that it has to have access to the data source schema and possible other informations like schema or different statistics. Implementation details vary from source to source but generally speaking it means that data should be accessible on all nodes including application master.

At the end of the day your expectations are almost correct. Chunks of the data are fetched individually on each worker without going through driver program, but driver has to be able to connect to Cassandra to fetch required metadata.

In this case it is the Metadata about the token range and how it is divided between nodes +1

scala - Apache Spark: Driver (instead of just the Executors) tries to ...

scala apache-spark cassandra
Rectangle 27 12

At heart, Cascading is a higher-level API on top of execution engines like MapReduce. It is analogous to Apache Crunch in this sense. Cascading has a few other related projects, like a Scala version (Scalding), and PMML scoring (Pattern).

Apache Spark is similar in the sense that it exposes a high-level API for data pipelines, and one that is available in Java and Scala.

It's more of an execution engine itself, than a layer on top of one. It has a number of associated projects, like MLlib, Streaming, GraphX, for ML, stream processing, graph computations.

Overall I find Spark a lot more interesting today, but they're not exactly for the same thing.

Cascading aims to support Spark as an "execution fabric". See cascading.org/new-fabric-support for more details.

Spark would more properly compared to MapReduce, which contrasts in-memory processing (Spark) vs. disk-based processing (MapReduce). Cascading currently is just an interface for writing MapReduce jobs.

java - Apache Spark or Cascading framework? - Stack Overflow

java apache-spark cascading