任务类型
离线开发支持多种计算引擎,不同的计算引擎可以支持不同的任务类型,下面按引擎类型分别说明。
SparkSQL
SparkSQL是基本的SQL类任务,用户在页面运行SparkSQL时,有如下特殊处理:
-
DDL:执行建表、删表、查询元数据等操作时,系统直连Spark Thrift Server进行操作;
-
DML:当进行INSERT / SELECT 等操作时,从系统稳定性考虑,不会直连Spark Thrift Server进行查询,而是经由后台调度系统封装为YARN任务进行操作。因此用户感官上会认为这类任务比直连Spark Thrift Server较慢。
-
SELECT * FROM t
的特殊处理:为提高响应速度,当执行这类SQL时,系统不会提交至调度系统,而是根据此表所在的HDFS目录,直接读取HDFS中的文件。
离线开发默认不允许用户绕过平台,直连Spark Thrift Server进行操作,直连操作(尤其是数据量较大时)可能会导致服务不稳定 |
默认环境参数
Spark
Spark任务,需用户在本地基于Spark的MapReduce编程接口(Java API或Scala API),并打为Jar包,提前将资源包通过「资源管理」模块上传至平台,之后创建Spark任务时引用此资源。
|
任务创建
-
资源
Spark任务需引用的资源包,需提前经「资源管理」上传至平台。一个任务只能引用一个资源包。
在进行代码打包时,为了缩小包的大小,Spark自带的包无需打包,Spark原生Jar包目录请参考Spark原生Jar包 |
-
mainClass
Jar包的入口类,格式为: org.apache.hadoop.examples
需填写完整类名
-
参数
传参方式与命令行传参形式一致,多个参数用空格隔开,支持填写系统参数或自定义参数(参考 参数配置),例如:
//函数的传参,与命令行方式一致的参数列表【输入路径和输出路径】,例如
/user/hive/tb_user /user/hive/tb_prod/pt=${bdp.system.bizdate}
示例代码
以下为Scala代码示例:
package com.host.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
object WordCount {
val LOG = LoggerFactory.getLogger("ScalaWordCount")
def main(args: Array[String]): Unit = {
//创建一个Config
val conf = new SparkConf()
.setAppName("ScalaWordCount")
//创建SparkContext对象
val sc = new SparkContext(conf)
val value1 = args(0)
//WordCount
val value: RDD[(String, Int)] = sc.textFile(value1)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.repartition(1)
.sortBy(_._2, false)
value.foreach(v =>{
print(v._1,v._2)
})
print(value1)
//停止SparkContext对象
sc.stop()
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>wordcount</artifactId>
<groupId>com.host.wordcount</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>SparkWordCount</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>testCompile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerVersion>1.7</compilerVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
PySpark
Python任务用于在Spark的Python编程接口(Python API)基础上实现的数据处理程序的周期运行,详细的编码规则请参考 Spark Python API官方文档。
数栈完全按照Spark官方的编程接口,您可以将代码打包,并以资源文件的形式上传到数栈中,然后配置Python任务。
HadoopMR
HadoopMapReduce(HadoopMR)任务,需用户在本地基于Hadoop MapReduce API写好Java代码并打为Jar包,提前将Jar包通过「资源管理」模块上传至平台,之后创建HadoopMR任务时引用此资源。
任务创建
-
资源
HadoopMR任务需引用的Jar包,需提前经「资源管理」上传至平台。一个任务只能引用一个Jar包。
-
mainClass
Jar包的入口类,格式为: org.apache.hadoop.examples
需填写完整类名。
-
任务参数
传参方式与命令行传参形式一致,多个参数用空格隔开,支持填写系统参数或自定义参数(参考 参数配置),例如:
//函数的传参,与命令行方式一致的参数列表【输入路径和输出路径】,例如
/user/hive/tb_user /user/hive/tb_prod/pt=${bdp.system.bizdate}
示例代码
main函数参数列表第一位必须为Configuration |
package org.apache.hadoop.examples.Mapreduce.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount
{
//conf值由{product_name_cn}平台管理
//job.submit 提交后需要返回jobId,返回类型为String
public static String main(Configuration conf,String[] args) throws Exception
{
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
job.submit();
return job.getJobID().toString();
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
context.write(key, this.result);
}
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
}
已有任务集成
Step1:修改pom.xml文件
首先把pom.xml文件导入
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>2.7.3</version>
<description>Apache Hadoop MapReduce Examples</description>
<name>Apache Hadoop MapReduce Examples</name>
<packaging>jar</packaging>
<properties>
<mr.examples.basedir>${basedir}</mr.examples.basedir>
<project.version>2.7.3</project.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.examples.Mapreduce.mr.WordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Step2:调整代码
Step2.1:修改main方法列表,代码中使用参数列表中的conf
修改前
/*********** 修改前 ***********/
public static void main(String[] args) throws Exception
{
Configuration conf =new Configuration();
}
修改后
/*********** 修改后 ***********/
public static String main(Configuration conf,String[] args) throws Exception
{
}
Step2.2:job.submit 并返回 jobId
修改前:
System.exit(job.waitForCompletion(true) ? 0 : 1);
修改后:
job.submit();
return job.getJobID().toString();
任务修改-样例
对已有的MR任务进行修改后的jar包如下【离线开发可直接测试使用】:
mainClass:org.apache.hadoop.examples.Mapreduce.mr.WordCount
数据同步
数据同步任务主要完成数据在不同存储单元之间的迁移,详细的数据同步任务配置规范请参考 数据同步模块。
Python
目前支持Python2、Python3的代码,由数栈提交运行,支持在页面中直接运行Python代码或打包上传运行,可以与数栈中的其他任务配合形成调度依赖关系。
默认安装的Python版本为2.7和3.6,且默认安装Miniconda,若用户需要其他Python包,可联系运维人员进行安装。
工作流
工作流是什么
当需要将一组任务串起来运行时,或需要AirFlow或者Azkaban类型的工作流任务时,需要新建「工作流」类型的任务
工作流相当于是一个「空壳」,里面可包含若干「节点」,每个节点相当于一个任务,例如SparkSQL节点,在运行时与一个SparkSQL任务没有差别
工作流的优缺点如下:
-
适用于一组相关度较高的操作,用工作流的方式将其「打包」为一个整体,运行、重跑等场景下操作比较方便
-
整个工作流必须作为整体来运行,在配置依赖关系、小范围重跑等操作时不够灵活
与普通任务的异同点
工作流与普通任务的异同点如下表:
对比项 | 普通任务 | 工作流 |
---|---|---|
任务命名 |
项目空间内唯一 |
工作流,及工作流内部的每个节点都必须在项目空间内唯一 |
依赖配置 |
依赖配置较为灵活 |
整个工作流必须作为一个整体,作为其他任务的上游或下游,不支持将依赖关系指定到工作流内部的节点 |
运维操作 |
可灵活进行重跑、杀死等操作 |
一般是工作流整体进行重跑、杀死等操作,也可以对部分节点进行操作,但此操作比较隐晦,初级用户可能无法找到入口 |
画布操作
-
拖拽:可从左侧拖拽节点至画布,在弹窗中补充相关信息可建立节点
-
画布操作:支持自动整理、放大、缩小、搜索操作,自动整理时可能会发生连线重叠的情况,单击某个节点可高亮相关连线
-
节点连线:hover在某个节点的下部,按住左键可进行节点间的连线。任务将按连线的上下依赖关系运行,上游任务成功后,下游任务才具备运行条件
-
双击节点:进入节点信息编辑页面,例如双击SparkSQL类型的节点,进入SQL编辑页面
-
节点操作:右键单击节点,弹出右键菜单
-
保存节点:保存当前节点的所有信息
-
编辑名称:编辑当前节点的名称
-
编辑节点属性:编辑当前节点的属性信息,例如Spark Jar类型的节点,编辑的弹窗与新建节点的弹窗是一样的,可通过这种方式执行替换Jar包等操作
-
查看节点内容:与双击节点相同
-
删除节点:删除本节点及相关的连线(此操作不可撤销)
-
-
删除连线:右键单击连线,可删除此连线
其他配置与操作
-
节点依赖要求:
-
需设置一个唯一的起点,整个工作流不能有多个起点
-
工作流内部依赖可以有分支,但不能成环
-
-
提交: 工作流需要将每个节点的配置信息分别保存后才可以提交
-
调度与依赖配置:工作流作为一个整体,可以配置调度依赖、上下游依赖等信息。工作流内部的每个节点,可以单独设置是否冻结、重试配置和起调时间
-
起调时间:工作流整体有启动运行的时间(例如每天05:00启动),每个节点可以单独设置起调时间(例如内部几个节点可以在06:00、08:00、10:00启动)
-
节点参数、环境参数:每个节点单独配置,与普通任务相同
|