本文共 1336 字,大约阅读时间需要 4 分钟。
一、先开启Hadoop和spark
略
二、启动spark-shell
spark-shell --master local[2] --jars /usr/local/src/spark-1.6.1-bin-hadoop2.6/libext/com.mysql.jdbc.Driver.jar
1.读取spark目录下面的logs日志作为测试:
val alllog=sc.textFile("file:///usr/local/src/spark-1.6.1-bin-hadoop2.6/logs/*out*")
alllog.count 看看一共有347记录
2.转为为DataFrame
现在读取进来的是RDD格式,用map函数把每条记录转成一行
import org.apache.spark.sql.Rowval alllogRDD=alllog.map(x =>Row(x))import org.apache.spark.sql.types._val schemaString="line"val schema=StructType(schemaString.split(" ").map(fieldName =>StructField(fieldName,StringType,true)))val alllogDataFrame = sqlContext.createDataFrame(alllogRDD, schema)alllogDataFrame.printSchema #打印schemaalllogDataFrame.show(false) #这里的false表示不省略,否则跟下午一样,会三点省略
到此为止,已经把RDD转化为DataFrame了。
三、把DataFrame转为为表用SQL查询
alllogDataFrame.registerTempTable("log")
sqlContext.sql("SELECT * FROM log").show()
到此就可以使用SQL了。
四、读取和存储外部数据源
1.读取json文件
val df = sqlContext.read.format("json").load("file:///mnt/hgfs/vm/china.json")
df.printSchema
df.select("*").write.format("parquet").mode("overwrite").save("file:///mnt/hgfs/vm/china.parquet") #保存为parquet格式
这里的mode可以有overwrite,append,ignore等模式,也可以不用。
这样就直接生产DataFrame数据,不用添加schema信息了。
对于parquet文件,还有更高级的使用方法,直接读取文件就行了
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
对于json里面有嵌套数组,想要展开成多行,可以在SQL中使用explode函
转载地址:http://osmjz.baihongyu.com/