欢迎投稿

今日深度:

Kylo 之 spark-job-profiler 源码阅读,

Kylo 之 spark-job-profiler 源码阅读,


https://github.com/Teradata/kylo/tree/master/integrations/spark/spark-job-profiler

A Spark job capable of performing generating profile statistics against a source table, partition, or for a provided query.

流程

  • Data is typically read from a source table such as -valid and a given partition.
  • Profile statistics are generated.
  • Profiler statistics is written to -profile table.

加载数据

    @Inject
    private SparkContextService scs;

    @Inject
    private SQLContext sqlContext;
    
    DataSet dataDF = scs.toDataSet(sqlContext.createDataFrame(dataRDD, schema));
  • SparkContextService 是一个接口,目前,实现 该接口的 class 有两个:
    SparkContextService16
    SparkContextService20

  • DataSet 是一个接口,目前,实现 该接口的 class 有两个:
    DataSet16
    DataSet20

生成 Profile statistics

    ProfilerConfiguration configuration = new ProfilerConfiguration();
    
    StatisticsModel statsModel = profiler.profile(dataDF, configuration);
  • ProfilerConfiguration 的源码如下
package com.thinkbiganalytics.spark.dataprofiler;

/*-
 * #%L
 * kylo-spark-job-profiler-api
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import java.io.Serializable;

/**
 * Helper class to hold parameters for profiler
 */
@SuppressWarnings("unused")
public class ProfilerConfiguration implements Serializable {

    private static final long serialVersionUID = -6099960489540200374L;

    private Integer decimalDigitsToDisplayConsoleOutput = 4;
    private String inputAndOutputTablePartitionKey = "partitionKey";
    private String inputTablePartitionColumnName = "processing_dttm";
    private Integer numberOfTopNValues = 5;
    private String outputDbName = "default";
    private String outputTableName = "profilestats";
    private String outputTablePartitionColumnName = "processing_dttm";
    private String sqlDialect = "hiveql";  // Hive supported HQL
    private Integer bins = 5;

    /**
     * Number of decimals to print out in console<br>
     * (not considered when writing to table)
     */
    public Integer getDecimalDigitsToDisplayConsoleOutput() {
        return decimalDigitsToDisplayConsoleOutput;
    }

    public void setDecimalDigitsToDisplayConsoleOutput(Integer decimalDigitsToDisplayConsoleOutput) {
        this.decimalDigitsToDisplayConsoleOutput = decimalDigitsToDisplayConsoleOutput;
    }

    /**
     * Partition key to read and write to
     */
    public String getInputAndOutputTablePartitionKey() {
        return inputAndOutputTablePartitionKey;
    }

    public void setInputAndOutputTablePartitionKey(String inputAndOutputTablePartitionKey) {
        this.inputAndOutputTablePartitionKey = inputAndOutputTablePartitionKey;
    }

    /**
     * Partition column name for input table
     */
    public String getInputTablePartitionColumnName() {
        return inputTablePartitionColumnName;
    }

    public void setInputTablePartitionColumnName(String inputTablePartitionColumnName) {
        this.inputTablePartitionColumnName = inputTablePartitionColumnName;
    }

    /**
     * N for top-N values to store in result table<br>
     * A required command line parameter
     */
    public Integer getNumberOfTopNValues() {
        return numberOfTopNValues;
    }

    public void setNumberOfTopNValues(Integer numberOfTopNValues) {
        this.numberOfTopNValues = numberOfTopNValues;
    }

    /**
     * Name of database to write result to
     */
    public String getOutputDbName() {
        return outputDbName;
    }

    public void setOutputDbName(String outputDbName) {
        this.outputDbName = outputDbName;
    }

    /**
     * Name of table to write result to<br>
     * A required command line parameter
     */
    public String getOutputTableName() {
        return outputTableName;
    }

    public void setOutputTableName(String outputTableName) {
        this.outputTableName = outputTableName;
    }

    /**
     * Partition column name for output table
     */
    public String getOutputTablePartitionColumnName() {
        return outputTablePartitionColumnName;
    }

    public void setOutputTablePartitionColumnName(String outputTablePartitionColumnName) {
        this.outputTablePartitionColumnName = outputTablePartitionColumnName;
    }

    /**
     * Gets the flavor of queries to run.
     */
    public String getSqlDialect() {
        return sqlDialect;
    }

    public void setSqlDialect(String sqlDialect) {
        this.sqlDialect = sqlDialect;
    }

    public Integer getBins() {
        return this.bins;
    }

    /**
     * Set the number of histogram bins to generate
     * @param bins the number of bins
     */
    public void setBins(Integer bins) {
        this.bins = bins;
    }
}
  • Profiler 是一个接口,目前,它唯一的实现为 StandardProfiler(Scala 实现)
package com.thinkbiganalytics.spark.dataprofiler

import com.thinkbiganalytics.spark.dataprofiler.function.PartitionLevelModels
import com.thinkbiganalytics.spark.{DataSet, SparkContextService}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructField

/** The standard implementation of `Profiler` that uses Spark to analyze the columns.
  *
 * @param sqlContext          the Spark SQL context
 * @param sparkContextService the Spark context service
  */
class StandardProfiler(val sqlContext: SQLContext, val sparkContextService: SparkContextService) extends Profiler {
    override def profile(dataset: DataSet, profilerConfiguration: ProfilerConfiguration): StatisticsModel = {
        /* Update schema map and broadcast it*/
        val schemaMap = populateSchemaMap(dataset)

        /* Get profile statistics */
        profileStatistics(dataset, schemaMap, profilerConfiguration).orNull
    }

    /** Generates a map from column index to field type.
      *
      * @param dataset the data set
      * @return the schema map
      */
    private def populateSchemaMap(dataset: DataSet): Map[Int, StructField] = {
        dataset.schema().fields.zipWithIndex.map(tuple => (tuple._2, tuple._1)).toMap
    }

    /** Profiles the columns in the specified data set.
      *
      * @param dataset   the data set
      * @param schemaMap the schema map
      * @return the statistics model
      */
    private def profileStatistics(dataset: DataSet, schemaMap: Map[Int, StructField], profilerConfiguration: ProfilerConfiguration): Option[StatisticsModel] = {
        // Get ((column index, column value), count)
        val columnValueCounts = dataset.rdd
            .flatMap((row) => row.toSeq.zipWithIndex.map((tuple) => ((tuple._2, tuple._1), 1)))
            .reduceByKey((a, b) => a + b)

        // Generate the profile model
        val partitionLevelModels = columnValueCounts.mapPartitions(new PartitionLevelModels(schemaMap, profilerConfiguration))
        val result = if (!partitionLevelModels.isEmpty) {
            Option(partitionLevelModels.reduce((a, b) => {
                a.combine(b)
                a
            }))
        } else {
            Option.empty
        }

        if (result.isDefined) {
            // Add histogram statistics to the combined model
            for ((colIdx,field) <- schemaMap) result.get.addAggregate(colIdx, dataset, field);
        }

      result;
    }
}
  • StatisticsModel 是 一个接口,目前,它唯一的实现为 StandardStatisticsModel
package com.thinkbiganalytics.spark.dataprofiler.model;

/*-
 * #%L
 * thinkbig-spark-job-profiler-app
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import com.thinkbiganalytics.spark.DataSet;
import com.thinkbiganalytics.spark.dataprofiler.ColumnStatistics;
import com.thinkbiganalytics.spark.dataprofiler.ProfilerConfiguration;
import com.thinkbiganalytics.spark.dataprofiler.StatisticsModel;
import com.thinkbiganalytics.spark.dataprofiler.columns.*;
import com.thinkbiganalytics.spark.dataprofiler.histo.HistogramStatistics;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;


/**
 * Class to store the profile statistics
 */
public class StandardStatisticsModel implements Serializable, StatisticsModel {

    private static final long serialVersionUID = -6115368868245871747L;

    private static final Logger log = LoggerFactory.getLogger(StandardStatisticsModel.class);
    private final Map<Integer, StandardColumnStatistics> columnStatisticsMap = new HashMap<>();

    @Nonnull
    private final ProfilerConfiguration profilerConfiguration;

    public StandardStatisticsModel(@Nonnull final ProfilerConfiguration profilerConfiguration) {
        this.profilerConfiguration = profilerConfiguration;
    }

    /**
     * Include a column value in calculation of profile statistics for the column
     *
     * @param columnIndex numeric index of column (0-based)
     * @param columnValue value in column
     * @param columnCount number of times value is found in column
     * @param columnField schema information of the column
     */
    public void add(Integer columnIndex, Object columnValue, Long columnCount, StructField columnField) {

        StandardColumnStatistics newColumnStatistics;

        if (!columnStatisticsMap.containsKey(columnIndex)) {
            DataType columnDataType = columnField.dataType();

            switch (columnDataType.simpleString()) {

                /* === Group 1 ===*/

                /*
                 * Hive datatype: 		TINYINT
                 * SparkSQL datatype: 	        tinyint
                 * Java datatype:		Byte
                 */
                case "tinyint":
                    newColumnStatistics = new ByteColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		SMALLINT
                 * SparkSQL datatype: 	        smallint
                 * Java datatype:		Short
                 */
                case "smallint":
                    newColumnStatistics = new ShortColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		INT
                 * SparkSQL datatype: 	        int
                 * Java datatype:		Int
                 */
                case "int":
                    newColumnStatistics = new IntegerColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		BIGINT
                 * SparkSQL datatype: 	        bigint
                 * Java datatype:		Long
                 */
                case "bigint":
                    newColumnStatistics = new LongColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 2 === */

                /*
                 * Hive datatype: 		FLOAT
                 * SparkSQL datatype: 	        float
                 * Java datatype:		Float
                 */
                case "float":
                    newColumnStatistics = new FloatColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		DOUBLE
                 * SparkSQL datatype: 	        double
                 * Java datatype:		Double
                 */
                case "double":
                    newColumnStatistics = new DoubleColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 3 === */

                /*
                 * Hive datatypes: 		STRING, VARCHAR
                 * SparkSQL datatype: 	        string
                 * Java datatype:		String
                 */
                case "string":
                    newColumnStatistics = new StringColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 4 === */

                /*
                 * Hive datatype: 		BOOLEAN
                 * SparkSQL datatype: 	        boolean
                 * Java datatype:		Boolean
                 */
                case "boolean":
                    newColumnStatistics = new BooleanColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 5 === */

                /*
                 * Hive datatype: 		DATE
                 * SparkSQL datatype: 	        date
                 * Java datatype:		java.sql.Date
                 */
                case "date":
                    newColumnStatistics = new DateColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		TIMESTAMP
                 * SparkSQL datatype: 	        timestamp
                 * Java datatype:		java.sql.Timestamp
                 */
                case "timestamp":
                    newColumnStatistics = new TimestampColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 6 === */

                default:
                    /*
                     * Hive datatype: 		DECIMAL
                     * SparkSQL datatype: 	        decimal
                     * Java datatype:		java.math.BigDecimal
                     *
                     * Handle the decimal type here since it comes with scale and precision e.g. decimal(7,5)
                     */
                    String decimalTypeRegex = "decimal\\S+";
                    if (columnDataType.simpleString().matches(decimalTypeRegex)) {
                        newColumnStatistics = new BigDecimalColumnStatistics(columnField, profilerConfiguration);
                    }

                    /*
                     * Hive datatypes: CHAR, BINARY, ARRAY, MAP, STRUCT, UNIONTYPE
                     */
                    else {
                        if (log.isWarnEnabled()) {
                            log.warn("[PROFILER-INFO] Unsupported data type: {}", columnDataType.simpleString());
                        }
                        newColumnStatistics = new UnsupportedColumnStatistics(columnField, profilerConfiguration);
                    }
            }
            columnStatisticsMap.put(columnIndex, newColumnStatistics);
        }

        StandardColumnStatistics currentColumnStatistics = columnStatisticsMap.get(columnIndex);
        currentColumnStatistics.accomodate(columnValue, columnCount);
    }

    /**
     * Generates additional statistics that requires operations on the entire dataFrame
     *
     * @param columnIndex the column index
     * @param ds          the dataSet
     * @param columnField the column
     */
    public void addAggregate(Integer columnIndex, DataSet ds, StructField columnField) {

        // Generate histogram statistics (numeric columns) and add statistics to model
        if (HistogramStatistics.isNumeric(columnField)) {
            HistogramStatistics histogramStatistics = new HistogramStatistics(profilerConfiguration);
            histogramStatistics.accomodate(columnIndex, ds.javaRDD(), columnField);
            columnStatisticsMap.get(columnIndex).getStatistics().addAll(histogramStatistics.getStatistics());
        }
    }

    protected Double toDoubleFunction(Row row) {
        return row.getDouble(0);
    }

    /**
     * Combine another statistics model
     */
    public void combine(StandardStatisticsModel statisticsModel) {

        for (Integer k_columnIndex : statisticsModel.columnStatisticsMap.keySet()) {

            StandardColumnStatistics columnStatistics = columnStatisticsMap.get(k_columnIndex);
            StandardColumnStatistics v_columnStatistics = statisticsModel.columnStatisticsMap.get(k_columnIndex);

            if (columnStatistics != null) {

                columnStatistics.combine(v_columnStatistics);

            } else {
                columnStatisticsMap.put(k_columnIndex, v_columnStatistics);
            }
        }
    }

    /**
     * Print the profile statistics on console
     *
     * @return profile model string
     */
    private String printModel() {

        StringBuilder sb = new StringBuilder();
        sb.append("====== Statistics Model ======");
        sb.append("\n");

        for (Map.Entry<Integer, StandardColumnStatistics> entry : columnStatisticsMap.entrySet()) {
            sb.append("=== Column #")
                .append(entry.getKey())
                .append("\n");

            sb.append(entry.getValue().getVerboseStatistics())
                .append("\n");
        }

        sb.append("==============================");
        return sb.toString();
    }


    /**
     * Print the profile statistics on console
     */
    @Override
    public String toString() {
        return printModel();
    }


    /**
     * Get the column statistics map (column number mapped to column statistics)
     *
     * @return column statistics map
     */
    @Override
    @SuppressWarnings({"unchecked", "squid:S1905"})
    public Map<Integer, ColumnStatistics> getColumnStatisticsMap() {
        return (Map) columnStatisticsMap;
    }
}

Profiler statistics 保存至 Hive 表

    OutputWriter.writeModel(statisticsModel, profilerConfiguration, sqlContext, sparkContextService);
  • OutputWriter 的源码如下
package com.thinkbiganalytics.spark.dataprofiler.output;

/*-
 * #%L
 * thinkbig-spark-job-profiler-app
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import com.thinkbiganalytics.hive.util.HiveUtils;
import com.thinkbiganalytics.spark.DataSet;
import com.thinkbiganalytics.spark.SparkContextService;
import com.thinkbiganalytics.spark.dataprofiler.ColumnStatistics;
import com.thinkbiganalytics.spark.dataprofiler.ProfilerConfiguration;
import com.thinkbiganalytics.spark.dataprofiler.StatisticsModel;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;

import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * Class to write profile statistics result to Hive table
 */
public class OutputWriter implements Serializable {

    private static final long serialVersionUID = -1250818467175932284L;

    /**
     * Write the profile statistics to Hive.
     */
    public static void writeModel(@Nonnull final StatisticsModel model, @Nonnull final ProfilerConfiguration profilerConfiguration, @Nonnull final SQLContext sqlContext,
                                  @Nonnull final SparkContextService scs) {
        final OutputWriter writer = new OutputWriter(profilerConfiguration);

        for (final ColumnStatistics column : model.getColumnStatisticsMap().values()) {
            writer.addRows(column.getStatistics());
        }

        writer.writeResultToTable(sqlContext, scs);
    }

    private final List<OutputRow> outputRows = new ArrayList<>();

    @Nonnull
    private final ProfilerConfiguration profilerConfiguration;

    private OutputWriter(@Nonnull final ProfilerConfiguration profilerConfiguration) {
        this.profilerConfiguration = profilerConfiguration;
    }

    /**
     * Helper method:
     * Check if output configuration (db, table, partition column, partition key) has been set
     */
    private boolean checkOutputConfigSettings() {
        return !((profilerConfiguration.getOutputDbName() == null)
                 || (profilerConfiguration.getOutputTableName() == null)
                 || (profilerConfiguration.getOutputTablePartitionColumnName() == null)
                 || (profilerConfiguration.getInputAndOutputTablePartitionKey() == null));
    }

    /**
     * Add multiple rows to write in output
     *
     * @param rows list of rows for output
     */
    public void addRows(List<OutputRow> rows) {
        outputRows.addAll(rows);
    }

    /**
     * Write result to Hive table
     *
     * @return boolean indicating result of write
     */
    @SuppressWarnings("unchecked")
    public boolean writeResultToTable(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs) {
        boolean retVal = false;

        if (!checkOutputConfigSettings()) {
            System.out.println("Error writing result: Output database/table/partition column/partition key not set.");
        } else if (sqlContext == null) {
            System.out.println("Error writing result: Spark context is not available.");
        } else {

            @SuppressWarnings("squid:S2095") final JavaRDD<OutputRow> outputRowsRDD = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()).parallelize(outputRows);
            DataSet outputRowsDF = scs.toDataSet(sqlContext, outputRowsRDD, OutputRow.class);
            //outputRowsDF.write().mode(SaveMode.Overwrite).saveAsTable(outputTable);

            // Since Spark doesn't support partitions, write to temp table, then write to partitioned table
            String tempTable = profilerConfiguration.getOutputTableName() + "_" + System.currentTimeMillis();
            outputRowsDF.registerTempTable(tempTable);

            createOutputTableIfNotExists(sqlContext, scs);
            writeResultToOutputTable(sqlContext, scs, tempTable);
            retVal = true;
        }

        return retVal;
    }


    /**
     * Create output table if does not exist
     */
    private void createOutputTableIfNotExists(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs) {
        String createTableSQL = "CREATE TABLE IF NOT EXISTS " + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputDbName(), profilerConfiguration.getOutputTableName()) + "\n"
                                + "(columnname STRING, metricname STRING, metricvalue STRING)\n"
                                + "PARTITIONED BY (" + profilerConfiguration.getOutputTablePartitionColumnName() + " STRING)\n"
                                + "ROW FORMAT DELIMITED\n"
                                + "FIELDS TERMINATED BY ','\n"
                                + "STORED AS TEXTFILE";

        scs.sql(sqlContext, createTableSQL);
    }


    /**
     * Write to output table
     */
    private void writeResultToOutputTable(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs, @Nonnull final String tempTable) {
        String insertTableSQL = "INSERT INTO TABLE " + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputDbName(), profilerConfiguration.getOutputTableName())
                                + " PARTITION (" + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputTablePartitionColumnName()) + "="
                                + HiveUtils.quoteString(profilerConfiguration.getInputAndOutputTablePartitionKey()) + ")"
                                + " SELECT columnname,metrictype,metricvalue FROM " + HiveUtils.quoteIdentifier(tempTable);

        scs.sql(sqlContext, insertTableSQL);

        System.out.println("[PROFILER-INFO] Metrics written to Hive table: "
                           + profilerConfiguration.getOutputDbName() + "." + profilerConfiguration.getOutputTableName()
                           + " Partition: (" + profilerConfiguration.getOutputTablePartitionColumnName() + "='" + profilerConfiguration.getInputAndOutputTablePartitionKey() + "')"
                           + " [" + outputRows.size() + " rows]");
    }
}
  • OutputRow 的源码如下
package com.thinkbiganalytics.spark.dataprofiler.output;

/*-
 * #%L
 * thinkbig-spark-job-profiler-app
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import java.io.Serializable;

/**
 * Class to represent a row in profile statistics output<br>
 * Format of output:<br>
 *
 * ColumnName, MetricType, MetricValue
 */
@SuppressWarnings("unused")
public class OutputRow implements Serializable {

    private static final long serialVersionUID = -8872905670704304249L;

    private String columnName;
    private String metricType;
    private String metricValue;

    /**
     * No-argument constructor
     */
    public OutputRow() {
        columnName = null;
        metricType = null;
        metricValue = null;
    }

    /**
     * Three-argument constructor to create a new row
     *
     * @param columnName  name of column
     * @param metricType  metric type
     * @param metricValue metric value
     */
    public OutputRow(String columnName, String metricType, String metricValue) {
        this.columnName = columnName;
        this.metricType = metricType;
        this.metricValue = metricValue;
    }

    /**
     * Get the column name
     *
     * @return column name
     */
    public String getColumnName() {
        return columnName;
    }

    /**
     * Set the column name
     *
     * @param columnName column name
     */
    public void setColumnName(String columnName) {
        this.columnName = columnName;
    }

    /**
     * Get the metric type
     *
     * @return metric type
     */
    public String getMetricType() {
        return metricType;
    }

    /**
     * Set the metric type
     *
     * @param metricType metric type
     */
    public void setMetricType(String metricType) {
        this.metricType = metricType;
    }

    /**
     * Get the metric value
     *
     * @return metric value
     */
    public String getMetricValue() {
        return metricValue;
    }

    /**
     * Set the metric value
     *
     * @param metricValue metric value
     */
    public void setMetricValue(String metricValue) {
        this.metricValue = metricValue;
    }

    /**
     * Set values for the row
     *
     * @param columnName  name of column
     * @param metricType  metric type
     * @param metricValue metric value
     */
    public void setValues(String columnName, String metricType, String metricValue) {
        this.columnName = columnName;
        this.metricType = metricType;
        this.metricValue = metricValue;
    }

    /**
     * Print verbose description of row to console
     */
    @Override
    public String toString() {
        return "OutputRow [columnName=" + columnName + ", metricType=" + metricType + ", metricValue=" + metricValue
               + "]";
    }
}

kylo UI

流程入口

https://github.com/Teradata/kylo/tree/master/integrations/spark/spark-job-profiler/spark-job-profiler-app/src/main/java/com/thinkbiganalytics/spark/dataprofiler/core

www.htsjk.Com true http://www.htsjk.com/teradata/31393.html NewsArticle Kylo 之 spark-job-profiler 源码阅读, https://github.com/Teradata/kylo/tree/master/integrations/spark/spark-job-profiler A Spark job capable of performing generating profile statistics against a source table, partition, or for a provi...
相关文章
    暂无相关文章
评论暂时关闭