Kylo 之 spark-job-profiler 源码阅读,
https://github.com/Teradata/kylo/tree/master/integrations/spark/spark-job-profiler
A Spark job capable of performing generating profile statistics against a source table, partition, or for a provided query.
流程
- Data is typically read from a source table such as -valid and a given partition.
- Profile statistics are generated.
- Profiler statistics is written to -profile table.
加载数据
@Inject
private SparkContextService scs;
@Inject
private SQLContext sqlContext;
DataSet dataDF = scs.toDataSet(sqlContext.createDataFrame(dataRDD, schema));
-
SparkContextService 是一个接口,目前,实现 该接口的 class 有两个:
SparkContextService16
SparkContextService20 -
DataSet 是一个接口,目前,实现 该接口的 class 有两个:
DataSet16
DataSet20
生成 Profile statistics
ProfilerConfiguration configuration = new ProfilerConfiguration();
StatisticsModel statsModel = profiler.profile(dataDF, configuration);
- ProfilerConfiguration 的源码如下
package com.thinkbiganalytics.spark.dataprofiler;
/*-
* #%L
* kylo-spark-job-profiler-api
* %%
* Copyright (C) 2017 ThinkBig Analytics
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.io.Serializable;
/**
* Helper class to hold parameters for profiler
*/
@SuppressWarnings("unused")
public class ProfilerConfiguration implements Serializable {
private static final long serialVersionUID = -6099960489540200374L;
private Integer decimalDigitsToDisplayConsoleOutput = 4;
private String inputAndOutputTablePartitionKey = "partitionKey";
private String inputTablePartitionColumnName = "processing_dttm";
private Integer numberOfTopNValues = 5;
private String outputDbName = "default";
private String outputTableName = "profilestats";
private String outputTablePartitionColumnName = "processing_dttm";
private String sqlDialect = "hiveql"; // Hive supported HQL
private Integer bins = 5;
/**
* Number of decimals to print out in console<br>
* (not considered when writing to table)
*/
public Integer getDecimalDigitsToDisplayConsoleOutput() {
return decimalDigitsToDisplayConsoleOutput;
}
public void setDecimalDigitsToDisplayConsoleOutput(Integer decimalDigitsToDisplayConsoleOutput) {
this.decimalDigitsToDisplayConsoleOutput = decimalDigitsToDisplayConsoleOutput;
}
/**
* Partition key to read and write to
*/
public String getInputAndOutputTablePartitionKey() {
return inputAndOutputTablePartitionKey;
}
public void setInputAndOutputTablePartitionKey(String inputAndOutputTablePartitionKey) {
this.inputAndOutputTablePartitionKey = inputAndOutputTablePartitionKey;
}
/**
* Partition column name for input table
*/
public String getInputTablePartitionColumnName() {
return inputTablePartitionColumnName;
}
public void setInputTablePartitionColumnName(String inputTablePartitionColumnName) {
this.inputTablePartitionColumnName = inputTablePartitionColumnName;
}
/**
* N for top-N values to store in result table<br>
* A required command line parameter
*/
public Integer getNumberOfTopNValues() {
return numberOfTopNValues;
}
public void setNumberOfTopNValues(Integer numberOfTopNValues) {
this.numberOfTopNValues = numberOfTopNValues;
}
/**
* Name of database to write result to
*/
public String getOutputDbName() {
return outputDbName;
}
public void setOutputDbName(String outputDbName) {
this.outputDbName = outputDbName;
}
/**
* Name of table to write result to<br>
* A required command line parameter
*/
public String getOutputTableName() {
return outputTableName;
}
public void setOutputTableName(String outputTableName) {
this.outputTableName = outputTableName;
}
/**
* Partition column name for output table
*/
public String getOutputTablePartitionColumnName() {
return outputTablePartitionColumnName;
}
public void setOutputTablePartitionColumnName(String outputTablePartitionColumnName) {
this.outputTablePartitionColumnName = outputTablePartitionColumnName;
}
/**
* Gets the flavor of queries to run.
*/
public String getSqlDialect() {
return sqlDialect;
}
public void setSqlDialect(String sqlDialect) {
this.sqlDialect = sqlDialect;
}
public Integer getBins() {
return this.bins;
}
/**
* Set the number of histogram bins to generate
* @param bins the number of bins
*/
public void setBins(Integer bins) {
this.bins = bins;
}
}
- Profiler 是一个接口,目前,它唯一的实现为 StandardProfiler(Scala 实现)
package com.thinkbiganalytics.spark.dataprofiler
import com.thinkbiganalytics.spark.dataprofiler.function.PartitionLevelModels
import com.thinkbiganalytics.spark.{DataSet, SparkContextService}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructField
/** The standard implementation of `Profiler` that uses Spark to analyze the columns.
*
* @param sqlContext the Spark SQL context
* @param sparkContextService the Spark context service
*/
class StandardProfiler(val sqlContext: SQLContext, val sparkContextService: SparkContextService) extends Profiler {
override def profile(dataset: DataSet, profilerConfiguration: ProfilerConfiguration): StatisticsModel = {
/* Update schema map and broadcast it*/
val schemaMap = populateSchemaMap(dataset)
/* Get profile statistics */
profileStatistics(dataset, schemaMap, profilerConfiguration).orNull
}
/** Generates a map from column index to field type.
*
* @param dataset the data set
* @return the schema map
*/
private def populateSchemaMap(dataset: DataSet): Map[Int, StructField] = {
dataset.schema().fields.zipWithIndex.map(tuple => (tuple._2, tuple._1)).toMap
}
/** Profiles the columns in the specified data set.
*
* @param dataset the data set
* @param schemaMap the schema map
* @return the statistics model
*/
private def profileStatistics(dataset: DataSet, schemaMap: Map[Int, StructField], profilerConfiguration: ProfilerConfiguration): Option[StatisticsModel] = {
// Get ((column index, column value), count)
val columnValueCounts = dataset.rdd
.flatMap((row) => row.toSeq.zipWithIndex.map((tuple) => ((tuple._2, tuple._1), 1)))
.reduceByKey((a, b) => a + b)
// Generate the profile model
val partitionLevelModels = columnValueCounts.mapPartitions(new PartitionLevelModels(schemaMap, profilerConfiguration))
val result = if (!partitionLevelModels.isEmpty) {
Option(partitionLevelModels.reduce((a, b) => {
a.combine(b)
a
}))
} else {
Option.empty
}
if (result.isDefined) {
// Add histogram statistics to the combined model
for ((colIdx,field) <- schemaMap) result.get.addAggregate(colIdx, dataset, field);
}
result;
}
}
- StatisticsModel 是 一个接口,目前,它唯一的实现为 StandardStatisticsModel
package com.thinkbiganalytics.spark.dataprofiler.model;
/*-
* #%L
* thinkbig-spark-job-profiler-app
* %%
* Copyright (C) 2017 ThinkBig Analytics
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.thinkbiganalytics.spark.DataSet;
import com.thinkbiganalytics.spark.dataprofiler.ColumnStatistics;
import com.thinkbiganalytics.spark.dataprofiler.ProfilerConfiguration;
import com.thinkbiganalytics.spark.dataprofiler.StatisticsModel;
import com.thinkbiganalytics.spark.dataprofiler.columns.*;
import com.thinkbiganalytics.spark.dataprofiler.histo.HistogramStatistics;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* Class to store the profile statistics
*/
public class StandardStatisticsModel implements Serializable, StatisticsModel {
private static final long serialVersionUID = -6115368868245871747L;
private static final Logger log = LoggerFactory.getLogger(StandardStatisticsModel.class);
private final Map<Integer, StandardColumnStatistics> columnStatisticsMap = new HashMap<>();
@Nonnull
private final ProfilerConfiguration profilerConfiguration;
public StandardStatisticsModel(@Nonnull final ProfilerConfiguration profilerConfiguration) {
this.profilerConfiguration = profilerConfiguration;
}
/**
* Include a column value in calculation of profile statistics for the column
*
* @param columnIndex numeric index of column (0-based)
* @param columnValue value in column
* @param columnCount number of times value is found in column
* @param columnField schema information of the column
*/
public void add(Integer columnIndex, Object columnValue, Long columnCount, StructField columnField) {
StandardColumnStatistics newColumnStatistics;
if (!columnStatisticsMap.containsKey(columnIndex)) {
DataType columnDataType = columnField.dataType();
switch (columnDataType.simpleString()) {
/* === Group 1 ===*/
/*
* Hive datatype: TINYINT
* SparkSQL datatype: tinyint
* Java datatype: Byte
*/
case "tinyint":
newColumnStatistics = new ByteColumnStatistics(columnField, profilerConfiguration);
break;
/*
* Hive datatype: SMALLINT
* SparkSQL datatype: smallint
* Java datatype: Short
*/
case "smallint":
newColumnStatistics = new ShortColumnStatistics(columnField, profilerConfiguration);
break;
/*
* Hive datatype: INT
* SparkSQL datatype: int
* Java datatype: Int
*/
case "int":
newColumnStatistics = new IntegerColumnStatistics(columnField, profilerConfiguration);
break;
/*
* Hive datatype: BIGINT
* SparkSQL datatype: bigint
* Java datatype: Long
*/
case "bigint":
newColumnStatistics = new LongColumnStatistics(columnField, profilerConfiguration);
break;
/* === Group 2 === */
/*
* Hive datatype: FLOAT
* SparkSQL datatype: float
* Java datatype: Float
*/
case "float":
newColumnStatistics = new FloatColumnStatistics(columnField, profilerConfiguration);
break;
/*
* Hive datatype: DOUBLE
* SparkSQL datatype: double
* Java datatype: Double
*/
case "double":
newColumnStatistics = new DoubleColumnStatistics(columnField, profilerConfiguration);
break;
/* === Group 3 === */
/*
* Hive datatypes: STRING, VARCHAR
* SparkSQL datatype: string
* Java datatype: String
*/
case "string":
newColumnStatistics = new StringColumnStatistics(columnField, profilerConfiguration);
break;
/* === Group 4 === */
/*
* Hive datatype: BOOLEAN
* SparkSQL datatype: boolean
* Java datatype: Boolean
*/
case "boolean":
newColumnStatistics = new BooleanColumnStatistics(columnField, profilerConfiguration);
break;
/* === Group 5 === */
/*
* Hive datatype: DATE
* SparkSQL datatype: date
* Java datatype: java.sql.Date
*/
case "date":
newColumnStatistics = new DateColumnStatistics(columnField, profilerConfiguration);
break;
/*
* Hive datatype: TIMESTAMP
* SparkSQL datatype: timestamp
* Java datatype: java.sql.Timestamp
*/
case "timestamp":
newColumnStatistics = new TimestampColumnStatistics(columnField, profilerConfiguration);
break;
/* === Group 6 === */
default:
/*
* Hive datatype: DECIMAL
* SparkSQL datatype: decimal
* Java datatype: java.math.BigDecimal
*
* Handle the decimal type here since it comes with scale and precision e.g. decimal(7,5)
*/
String decimalTypeRegex = "decimal\\S+";
if (columnDataType.simpleString().matches(decimalTypeRegex)) {
newColumnStatistics = new BigDecimalColumnStatistics(columnField, profilerConfiguration);
}
/*
* Hive datatypes: CHAR, BINARY, ARRAY, MAP, STRUCT, UNIONTYPE
*/
else {
if (log.isWarnEnabled()) {
log.warn("[PROFILER-INFO] Unsupported data type: {}", columnDataType.simpleString());
}
newColumnStatistics = new UnsupportedColumnStatistics(columnField, profilerConfiguration);
}
}
columnStatisticsMap.put(columnIndex, newColumnStatistics);
}
StandardColumnStatistics currentColumnStatistics = columnStatisticsMap.get(columnIndex);
currentColumnStatistics.accomodate(columnValue, columnCount);
}
/**
* Generates additional statistics that requires operations on the entire dataFrame
*
* @param columnIndex the column index
* @param ds the dataSet
* @param columnField the column
*/
public void addAggregate(Integer columnIndex, DataSet ds, StructField columnField) {
// Generate histogram statistics (numeric columns) and add statistics to model
if (HistogramStatistics.isNumeric(columnField)) {
HistogramStatistics histogramStatistics = new HistogramStatistics(profilerConfiguration);
histogramStatistics.accomodate(columnIndex, ds.javaRDD(), columnField);
columnStatisticsMap.get(columnIndex).getStatistics().addAll(histogramStatistics.getStatistics());
}
}
protected Double toDoubleFunction(Row row) {
return row.getDouble(0);
}
/**
* Combine another statistics model
*/
public void combine(StandardStatisticsModel statisticsModel) {
for (Integer k_columnIndex : statisticsModel.columnStatisticsMap.keySet()) {
StandardColumnStatistics columnStatistics = columnStatisticsMap.get(k_columnIndex);
StandardColumnStatistics v_columnStatistics = statisticsModel.columnStatisticsMap.get(k_columnIndex);
if (columnStatistics != null) {
columnStatistics.combine(v_columnStatistics);
} else {
columnStatisticsMap.put(k_columnIndex, v_columnStatistics);
}
}
}
/**
* Print the profile statistics on console
*
* @return profile model string
*/
private String printModel() {
StringBuilder sb = new StringBuilder();
sb.append("====== Statistics Model ======");
sb.append("\n");
for (Map.Entry<Integer, StandardColumnStatistics> entry : columnStatisticsMap.entrySet()) {
sb.append("=== Column #")
.append(entry.getKey())
.append("\n");
sb.append(entry.getValue().getVerboseStatistics())
.append("\n");
}
sb.append("==============================");
return sb.toString();
}
/**
* Print the profile statistics on console
*/
@Override
public String toString() {
return printModel();
}
/**
* Get the column statistics map (column number mapped to column statistics)
*
* @return column statistics map
*/
@Override
@SuppressWarnings({"unchecked", "squid:S1905"})
public Map<Integer, ColumnStatistics> getColumnStatisticsMap() {
return (Map) columnStatisticsMap;
}
}
Profiler statistics 保存至 Hive 表
OutputWriter.writeModel(statisticsModel, profilerConfiguration, sqlContext, sparkContextService);
- OutputWriter 的源码如下
package com.thinkbiganalytics.spark.dataprofiler.output;
/*-
* #%L
* thinkbig-spark-job-profiler-app
* %%
* Copyright (C) 2017 ThinkBig Analytics
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.thinkbiganalytics.hive.util.HiveUtils;
import com.thinkbiganalytics.spark.DataSet;
import com.thinkbiganalytics.spark.SparkContextService;
import com.thinkbiganalytics.spark.dataprofiler.ColumnStatistics;
import com.thinkbiganalytics.spark.dataprofiler.ProfilerConfiguration;
import com.thinkbiganalytics.spark.dataprofiler.StatisticsModel;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Class to write profile statistics result to Hive table
*/
public class OutputWriter implements Serializable {
private static final long serialVersionUID = -1250818467175932284L;
/**
* Write the profile statistics to Hive.
*/
public static void writeModel(@Nonnull final StatisticsModel model, @Nonnull final ProfilerConfiguration profilerConfiguration, @Nonnull final SQLContext sqlContext,
@Nonnull final SparkContextService scs) {
final OutputWriter writer = new OutputWriter(profilerConfiguration);
for (final ColumnStatistics column : model.getColumnStatisticsMap().values()) {
writer.addRows(column.getStatistics());
}
writer.writeResultToTable(sqlContext, scs);
}
private final List<OutputRow> outputRows = new ArrayList<>();
@Nonnull
private final ProfilerConfiguration profilerConfiguration;
private OutputWriter(@Nonnull final ProfilerConfiguration profilerConfiguration) {
this.profilerConfiguration = profilerConfiguration;
}
/**
* Helper method:
* Check if output configuration (db, table, partition column, partition key) has been set
*/
private boolean checkOutputConfigSettings() {
return !((profilerConfiguration.getOutputDbName() == null)
|| (profilerConfiguration.getOutputTableName() == null)
|| (profilerConfiguration.getOutputTablePartitionColumnName() == null)
|| (profilerConfiguration.getInputAndOutputTablePartitionKey() == null));
}
/**
* Add multiple rows to write in output
*
* @param rows list of rows for output
*/
public void addRows(List<OutputRow> rows) {
outputRows.addAll(rows);
}
/**
* Write result to Hive table
*
* @return boolean indicating result of write
*/
@SuppressWarnings("unchecked")
public boolean writeResultToTable(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs) {
boolean retVal = false;
if (!checkOutputConfigSettings()) {
System.out.println("Error writing result: Output database/table/partition column/partition key not set.");
} else if (sqlContext == null) {
System.out.println("Error writing result: Spark context is not available.");
} else {
@SuppressWarnings("squid:S2095") final JavaRDD<OutputRow> outputRowsRDD = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()).parallelize(outputRows);
DataSet outputRowsDF = scs.toDataSet(sqlContext, outputRowsRDD, OutputRow.class);
//outputRowsDF.write().mode(SaveMode.Overwrite).saveAsTable(outputTable);
// Since Spark doesn't support partitions, write to temp table, then write to partitioned table
String tempTable = profilerConfiguration.getOutputTableName() + "_" + System.currentTimeMillis();
outputRowsDF.registerTempTable(tempTable);
createOutputTableIfNotExists(sqlContext, scs);
writeResultToOutputTable(sqlContext, scs, tempTable);
retVal = true;
}
return retVal;
}
/**
* Create output table if does not exist
*/
private void createOutputTableIfNotExists(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs) {
String createTableSQL = "CREATE TABLE IF NOT EXISTS " + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputDbName(), profilerConfiguration.getOutputTableName()) + "\n"
+ "(columnname STRING, metricname STRING, metricvalue STRING)\n"
+ "PARTITIONED BY (" + profilerConfiguration.getOutputTablePartitionColumnName() + " STRING)\n"
+ "ROW FORMAT DELIMITED\n"
+ "FIELDS TERMINATED BY ','\n"
+ "STORED AS TEXTFILE";
scs.sql(sqlContext, createTableSQL);
}
/**
* Write to output table
*/
private void writeResultToOutputTable(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs, @Nonnull final String tempTable) {
String insertTableSQL = "INSERT INTO TABLE " + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputDbName(), profilerConfiguration.getOutputTableName())
+ " PARTITION (" + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputTablePartitionColumnName()) + "="
+ HiveUtils.quoteString(profilerConfiguration.getInputAndOutputTablePartitionKey()) + ")"
+ " SELECT columnname,metrictype,metricvalue FROM " + HiveUtils.quoteIdentifier(tempTable);
scs.sql(sqlContext, insertTableSQL);
System.out.println("[PROFILER-INFO] Metrics written to Hive table: "
+ profilerConfiguration.getOutputDbName() + "." + profilerConfiguration.getOutputTableName()
+ " Partition: (" + profilerConfiguration.getOutputTablePartitionColumnName() + "='" + profilerConfiguration.getInputAndOutputTablePartitionKey() + "')"
+ " [" + outputRows.size() + " rows]");
}
}
- OutputRow 的源码如下
package com.thinkbiganalytics.spark.dataprofiler.output;
/*-
* #%L
* thinkbig-spark-job-profiler-app
* %%
* Copyright (C) 2017 ThinkBig Analytics
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.io.Serializable;
/**
* Class to represent a row in profile statistics output<br>
* Format of output:<br>
*
* ColumnName, MetricType, MetricValue
*/
@SuppressWarnings("unused")
public class OutputRow implements Serializable {
private static final long serialVersionUID = -8872905670704304249L;
private String columnName;
private String metricType;
private String metricValue;
/**
* No-argument constructor
*/
public OutputRow() {
columnName = null;
metricType = null;
metricValue = null;
}
/**
* Three-argument constructor to create a new row
*
* @param columnName name of column
* @param metricType metric type
* @param metricValue metric value
*/
public OutputRow(String columnName, String metricType, String metricValue) {
this.columnName = columnName;
this.metricType = metricType;
this.metricValue = metricValue;
}
/**
* Get the column name
*
* @return column name
*/
public String getColumnName() {
return columnName;
}
/**
* Set the column name
*
* @param columnName column name
*/
public void setColumnName(String columnName) {
this.columnName = columnName;
}
/**
* Get the metric type
*
* @return metric type
*/
public String getMetricType() {
return metricType;
}
/**
* Set the metric type
*
* @param metricType metric type
*/
public void setMetricType(String metricType) {
this.metricType = metricType;
}
/**
* Get the metric value
*
* @return metric value
*/
public String getMetricValue() {
return metricValue;
}
/**
* Set the metric value
*
* @param metricValue metric value
*/
public void setMetricValue(String metricValue) {
this.metricValue = metricValue;
}
/**
* Set values for the row
*
* @param columnName name of column
* @param metricType metric type
* @param metricValue metric value
*/
public void setValues(String columnName, String metricType, String metricValue) {
this.columnName = columnName;
this.metricType = metricType;
this.metricValue = metricValue;
}
/**
* Print verbose description of row to console
*/
@Override
public String toString() {
return "OutputRow [columnName=" + columnName + ", metricType=" + metricType + ", metricValue=" + metricValue
+ "]";
}
}
kylo UI
流程入口
https://github.com/Teradata/kylo/tree/master/integrations/spark/spark-job-profiler/spark-job-profiler-app/src/main/java/com/thinkbiganalytics/spark/dataprofiler/core
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。