今天需要从本地Resource文件夹里读取json文件,发现用如下代码有问题:

spark 2.X与1.x的区别

spark sql
2.x以上版本和1.x版本有个很大的区别:spark1.x的sqlContext在spark2.0中被整合到sparkSession,故而利用spark-shell客户端操作会有些许不同,具体如下文所述。


python学习day4之路文件的序列化和反序列化,

json和pickle序列化和反序列化

  json是用来实现不同程序之间的文件交互,由于不同程序之间需要进行文件信息交互,由于用python写的代码可能要与其他语言写的代码进行数据传输,json支持所有程序之间的交互,json将取代XML,由于XML格式稍微比较复杂。现在程序之间的交互都是用json来进行文件信息的交互。

  在使用json序列化和反序列化的时候,dump一次,就要load一次,不能操作。

  图片 1

  json序列化的过程,就是写入文件中,让另外一个编程语言进行调用:

  import json

  info = {“alex”:”sb”,”test”:”hard”}

  with open(“file”,”w”) as f:
    f.write(json.dumps(info))

  上述代码使用json将info字典信息写入到一个文件中,文件中只能存储字符串格式的信息,或者二进制文件的信息,不能存放数字等信息,放入文件中的信息都是字符串类型的,这点一定要注意.

  json反序列化的过程:

  import json

  ”’反序列化起始就是把dump进去的信息进行提取,以实现不同编程语言的交互”’

  with open(“file”,”r”) as f:
    data = json.loads(f.read())
    print(data)
    print(type(data))
    print(data[“alex”])  

  上面代码,将使用json格式存入的信息读取出来,如下所示:

  {‘test’: ‘hard’, ‘alex’: ‘sb’}
  <class ‘dict’>
  sb
  上述代码实现了将字符串信息读取问字典的功能,其实,序列化和反序列化就是将原来的格式先转化为字符串,然后在读取出来的过程,以便能够实现交互.

  我们也可以使用其他方式进行序列化和反序列化,我们知道,有一个函数eval(),能够实现把字符串信息转化为原本样式,如下:

  info = [11,22,33,65,33]

  with open(“test.text”,”w”) as f:
    f.write(str(info))
#使用wirte()只能向文件中写入字符串格式的信息,不能写入其他类型的信息

  with open(“test.text”,”r”) as f_obj:
    data = f_obj.read()

    data = eval(data)
    print(type(data))
    print(data)  

  程序运行如下:

  <class ‘list’>
  [11, 22, 33, 65, 33]
  上述过程中,我们利用python自带的eval()函数也实现了序列化和反序列化的过程,但是由于序列化和反序列化是在同一个程序中实现的,在其他程序中有没有eval()是不确定的,但是json支持所有的编程语言,所以现在一般都使用json实现不同编程语言之间的信息交互.

  dump和load也是实现上面dumps和loads的功能,只是实现的方式不一样而言,语法稍微有一些区别,如下:

  dump序列化:

  import json

  info = {“alex”:”sb”,”test”:”hard”}

  with open(“file”,”w”) as f:
    json.dump(info,f)

  load()反序列化:

  import json

  ”’反序列化起始就是把dump进去的信息进行提取,以实现不同编程语言的交互”’

  with open(“file”,”r”) as f:
    data = json.load(f)
    print(data)
    print(type(data))
    print(data[“alex”])  

  上面程序实现了序列化和反序列化的功能,dump(信息,文件路径),load(文件路径),从哪个文件读取信息.

  在不同程序间实现数据的交换.

  不同程序之间的数据交换,或者是将字符串的信息转化为原有的形式;

  eval()函数的功能也很强大,能够之间将字符串形式的信息转化为原有的信息,如下:

  >>> dic = “{‘alex’:’sb’,’try’:’workhard’}”
  >>> data = eval(dic)
  >>> data
  {‘try’: ‘workhard’, ‘alex’: ‘sb’}

  程序只dump一次,load一次,不能dump多次.dumps好几个文件实现; 

json和pickle序列化和反序列化
json是用来实现不同程序之间的文件交互,由于不同程序之间需要…

  • 代码一

载入外部数据的load方法

在spark
sql中有一个DataStreamReader封装了读取各种格式的外部数据的方法,其中,format(str)用于传数据格式,比如csv,json,parquet,jdbc等;load(path)用于传入数据的地址,其中可以传入本地数据路径也可以是hdfs上的路径,在官网给的demo中都是传的本地数据路径:比如:

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
  • load(path)的源码:注意:load不能l载入hive的数据,hive数据需要使用table方法来载入。

    def load(path: String): DataFrame = {
     option("path", path).load()
       }
    
       def load(): DataFrame = {
     if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
       throw new AnalysisException("Hive data source can only be used with tables, you can not " +
         "read files of Hive data source directly.")
     }
    
     val dataSource =
       DataSource(
       sparkSession,
     userSpecifiedSchema = userSpecifiedSchema,
     className = source,
     options = extraOptions.toMap)
     Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
       }
    
  • 【hdfs路径】写入写出hdfs上的路径,则需要加入hdfs的完全路径,如:

    studentDF.write.parquet(“hdfs://h4:9000/test/spark/parquet”)
    studentDF.write.json(“hdfs://h4:9000/test/spark/json”)


spark sql与mysql 和hdfs交互的实战

  • 1.添加jar包
  1. 正常配置不再赘述,这里如果需要读取MySQL数据,则需要在当前用户下的环境变量里额外加上JDBC的驱动jar包
    例如我的是:mysql-connector-java-5.1.18-bin.jar
    存放路径是$SPARK_HOME/jars 所以需要额外配置环境变量
    export PATH = $PATH:$SPARK_HOME/jars
  • 2.启动spark-shell

    bin/spark-shell
    –master=spark://h4:7077
    –driver-class-path=./jars/mysql-connector-java-5.1.18-bin.jar —
    jars=./jars/mysql-connector-java-5.1.18-bin.jar

  • 3.代码

spark-sql采用sql方式执行操作正常启动之后可以先通过spark-sql建立数据库并切换到当前新建的数据库
spark.sql(“create database spark”)
可以查看下是否新建成功
spark.sql(“show databases “).show
创建成功之后切换数据库
spark.sql(“use spark”)
现在开始读取远程MySQL数据
val sql = “””CREATE TABLE student USING org.apache.spark.sql.jdbc
OPTIONS ( url
“jdbc:mysql://worker2:3306/spark”,
dbtable “student”, user “root”, password “root” )”””
执行:
spark.sql(sql);

等待执行完毕之后,将表数据存入缓存
spark.sql(“cache table student”)
此时即可进行操作,例如:val studentDF = spark.sql(“select id,name from
student”)
完成需求查询之后,可将结果以parquet的格式保存到HDFS
studentDF.write.parquet(“hdfs://h4:9000/test/spark/parquet”)
也可以写成json格式
studentDF.write.json(“hdfs://h4:9000/test/spark/json”)

  • 4.性能:

集群状态下,硬件配置32G内存
2T硬盘,spark配了4核,内存分配了20G的情况下,测试速度如下:
2700万条记录的表导入spark用时1秒以内
sparksql将其以json格式存入HDFS用时288秒,共1.0G,将其以parquet格式存入HDFS用时207秒,共86.6M,可见parquet的优势还是比较明显

参考链接:
http://blog.51cto.com/10901776/1875371

NSString *path = [self pathForLocationsDataFile];locationDict = [[NSMutableDictionary alloc] initWithContentsOfFile:path];// locationDict一直为nil

刚开始一直以为路径下没有对应的json文件,后来跑到.app路径下看,json文件的确存在。

后来改用如下代码,ok了

Author

发表评论

电子邮件地址不会被公开。 必填项已用*标注