sparksql查询的数据保存(sparksql命令行怎么保存数据)

技术Spark SQL数据加载和保存的实例分析今天就跟大家聊聊有关Spark SQL数据加载和保存的实例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、前置

今天就跟大家聊聊有关火花结构化查询语言数据加载和保存的实例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

一、前置知识详解 星火结构化查询语言重要是操作数据帧本身提供了救援和负荷的操作,加载:可以创建数据帧,保存:把数据帧中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。

二、Spark SQL读写数据代码实战

导入组织。阿帕奇。火花。sparkconf导入组织。阿帕奇。火花。API。Java。JavadD导入组织。阿帕奇。火花。API。Java。JavaPonekContext导入组织。阿帕奇。火花。API。Java。功能。功能;导入组织。阿帕奇。火花。SQL。*;导入组织。阿帕奇。火花。SQL。类型。数据类型;导入组织。阿帕奇。火花。SQL。类型。结构字段;导入组织。阿帕奇。火花。SQL。类型。结构类型;导入Java。乌提尔。ArrayList导入Java。乌提尔。列表;public class sparksqlloadsaveops { public stativonmain(String[]args){ SparkConfconf=NewsArkConf().setMaster('local ').setAppName(' SparkSQLLoadSaveOps ');javapanokontextsc=new javapanokontext(conf);SQLContext=Newsql上下文(sc);/* * *阅读()是数据框阅读器类型,负载可以将数据读取出来*/data framepeopledf=sqlcontext。read().格式(“json”).加载(' e : \ \ Spark \ \ Spark install _ package \ \ Big _ Data _ Software \ \ Spark-1。6 .0-bin-Hadoop 2.6 \ \示例\ \ src \ \ main \ \ resources \ \ people。JSON’);/***直接对数据帧进行操作* Json:是一种自解释的格式,读取Json的时候怎么判断其是什么格式?*通过扫描整个Json。扫描之后才会知道元数据*///通过方式来指定输出文件的是追加。创建新文件来追加文件peopleDF.select('name ').写入()。模式(保存模式。追加)。保存(' e : \ \个人姓名');}}读取过程源码分析如下: 1.阅读方法返回数据框阅读器,用于读取数据。

/* * * :实验: * Returnsa[[数据框阅读器]]thankanusedtoraddatainsa[[数据框]].* { { { * SQLContext。阅读。拼花地板('/路径/到/文件。拼花')* SQLContext。阅读。图式.json('/path/to/f

ile.json") * }}} * * @group genericdata * @since 1.4.0 */@Experimental//创建DataFrameReader实例,获得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)

2. 然后再调用DataFrameReader类中的format,指出读取文件的格式。

/** * Specifies the input data source format. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source this}

3. 通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。

/** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}

至此,数据的读取工作就完成了,下面就对DataFrame进行操作。 下面就是写操作!!!

1. 调用DataFrame中select函数进行对列筛选

/** * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. cannot construct expressions). * * {{{ *  // The following two are equivalent: *  df.select("colA", "colB") *  df.select($"colA", $"colB") * }}} * @group dfops * @since 1.3.0 */@scala.annotation.varargsdef select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

2. 然后通过write将结果写入到外部存储系统中。

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)

3. 在保持文件的时候mode指定追加文件的方式

/** * Specifies the behavior when data or table already exists. Options include:// Overwrite是覆盖 *  - `SaveMode.Overwrite`: overwrite the existing data.//创建新的文件,然后追加 *  - `SaveMode.Append`: append the data. *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op). *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}

4. 最后,save()方法触发action,将文件输出到指定文件中。

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}

三、Spark SQL读写整个流程图如下

四、对于流程中部分函数源码详解

DataFrameReader.Load()

1. Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取。

/** * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. * * @group genericdata * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0. */@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")def load(path: String): DataFrame = {//此时的read就是DataFrameReader read.load(path)}

2. 追踪load源码进去,源码如下:在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。

/**  * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}

3. 追踪load源码如下:

/** * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external * key-value stores). * * @since 1.4.0 */def load(): DataFrame = {//对传入的Source进行解析 val resolved = ResolvedDataSource(  sqlContext,  userSpecifiedSchema = userSpecifiedSchema,  partitionColumns = Array.empty[String],  provider = source,  options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation))}

DataFrameReader.format()

1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。 Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.

/** * Specifies the input data source format.Built-in options include “parquet”,”json”,etc. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source //FileType this}

DataFrame.write()

1. 创建DataFrameWriter实例

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)1

2. 追踪DataFrameWriter源码如下:以DataFrame的方式向外部存储系统中写入数据。

/** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @since 1.4.0 */@Experimentalfinal class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆盖,之前写的数据全都被覆盖了。 Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。

/** * Specifies the behavior when data or table already exists. Options include: *  - `SaveMode.Overwrite`: overwrite the existing data. *  - `SaveMode.Append`: append the data. *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).//默认操作 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}

2. 通过模式匹配接收外部参数

/** * Specifies the behavior when data or table already exists. Options include: *  - `overwrite`: overwrite the existing data. *  - `append`: append the data. *  - `ignore`: ignore the operation (i.e. no-op). *  - `error`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: String): DataFrameWriter = { this.mode = saveMode.toLowerCase match {  case "overwrite" => SaveMode.Overwrite  case "append" => SaveMode.Append  case "ignore" => SaveMode.Ignore  case "error" | "default" => SaveMode.ErrorIfExists  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") } this}

DataFrameWriter.save()

1. save将结果保存传入的路径。

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}

2. 追踪save方法。

/** * Saves the content of the [[DataFrame]] as the specified table. * * @since 1.4.0 */def save(): Unit = { ResolvedDataSource(  df.sqlContext,  source,  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),  mode,  extraOptions.toMap,  df)}

3. 其中source是SQLConf的defaultDataSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。

// This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", defaultValue = Some("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")

DataFrame.scala中部分函数详解:

1. toDF函数是将RDD转换成DataFrame

/** * Returns the object itself. * @group basic * @since 1.3.0 */// This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.def toDF(): DataFrame = this

2. show()方法:将结果显示出来

/** * Displays the [[DataFrame]] in a tabular form. For example: * {{{ *  year month AVG('Adj Close) MAX('Adj Close) *  1980 12  0.503218    0.595103 *  1981 01  0.523289    0.570307 *  1982 02  0.436504    0.475256 *  1983 03  0.410516    0.442194 *  1984 04  0.450090    0.483521 * }}} * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will *       be truncated and all cells will be aligned right * * @group action * @since 1.5.0 */// scalastyle:off printlndef show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))// scalastyle:on println

追踪showString源码如下:showString中触发action收集数据。

/** * Compose the string representing rows for output * @param _numRows Number of rows to show * @param truncate Whether truncate long strings and align cells right */private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length

看完上述内容,你们对Spark SQL数据加载和保存的实例分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/149945.html

(0)

相关推荐

  • 怎么配置Magento2 system.xml dateTime时间

    技术怎么配置Magento2 system.xml dateTime时间这篇文章主要讲解了“怎么配置Magento2 system.xml dateTime时间”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小

    攻略 2021年11月12日
  • 如何理解微服务与DevOps

    技术如何理解微服务与DevOps本篇内容介绍了“如何理解微服务与DevOps”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成

    攻略 2021年10月19日
  • 火车怎么读,以|A开头的火车, 应该怎样读?

    技术火车怎么读,以|A开头的火车, 应该怎样读?Z-直达特快列车T-特快列车K-快速列车N-管内快速列车(和K一个意思火车怎么读,咱们普通乘客不用特意区别)L、A-临时旅客列车Y-旅游列车没有字母的四位车次普通列车详细如

    生活 2021年10月29日
  • 描写夜晚环境的句子,晚上环境描写的句子关于亲情的

    技术描写夜晚环境的句子,晚上环境描写的句子关于亲情的父亲出门的时候描写夜晚环境的句子,月亮还没下山。父亲回家的时候,月亮早就上山了。五岁那年,我浑身长满了水痘,每天都要去医院打针,从家到医院大约有三四里路,每次去父亲总把

    生活 2021年10月26日
  • 2021暑假游记

    技术2021暑假游记 2021暑假游记\(Day\ 1:2021.7.11\)
    下午3点到校。
    晚上学了DFS序和欧拉序。懂了但没完全懂,而且用处不大。做了几道模板题。
    \(Day\ 2:2021.7.

    礼包 2021年10月19日
  • 怎么解析IE6,IE7浏览器兼容性问题

    技术怎么解析IE6,IE7浏览器兼容性问题这篇文章将为大家详细讲解有关怎么解析IE6,IE7浏览器兼容性问题,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。向大家描述一下I

    攻略 2021年11月25日