PySpark Example Code

Development Description

The CloudTable HBase and MRS HBase can be connected to DLI as data sources.

  • Prerequisites

    A datasource connection has been created on the DLI management console.


    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Code implementation

    1. Import dependency packages.

      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
      from pyspark.sql import SparkSession
    2. Create a session.

      sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
  • Connecting to data sources through SQL APIs

    1. Create a table to connect to an HBase data source.

      • The sample code is applicable, if Kerberos authentication is disabled for the interconnected HBase cluster:

            "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
            'ZKHost' = '',\
            'TableName' = 'hbtest',\
            'RowKey' = 'id:5',\
            'Cols' = 'location:info.location,')")
      • The sample code is applicable, if Kerberos authentication is enabled for the interconnected HBase cluster:

            "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
            'ZKHost' = '',\
            'TableName' = 'hbtest',\
            'RowKey' = 'id:5',\
            'Cols' = 'location:info.location,',\
            'krb5conf' = './krb5.conf',\
            'principal' ='krbtest')")

        If Kerberos authentication is enabled, you need to set three more parameters, as listed in Table 1.

        Table 1 Description

        Parameter and Value


        'krb5conf' = './krb5.conf'

        Path of the krb5.conf file.


        Path of the keytab file.

        'principal' ='krbtest'

        Authentication username.

        For details about how to obtain the krb5.conf and keytab files, see Completing Configurations for Enabling Kerberos Authentication.


    2. Import data to HBase.

      sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
    3. Read data from HBase.

      sparkSession.sql("select * from testhbase").show()
  • Connecting to data sources through DataFrame APIs

    1. Create a table to connect to an HBase data source.

        "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
           floatf FLOAT, doublef DOUBLE) using hbase OPTIONS (\
         'ZKHost' = ',\
         'TableName' = 'table_DupRowkey1',\
         'RowKey' = 'id:5,location:6,city:7',\
         'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, \  longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')")


      • For details about the ZKHost, RowKey, and Cols parameters, see Table 1.

      • TableName: Name of a table in the CloudTable file. If no table name exists, the system automatically creates one.

    2. Construct a schema.

      schema = StructType([StructField("id", StringType()),\
                           StructField("location", StringType()),\
                           StructField("city", StringType()),\
                           StructField("booleanf", BooleanType()),\
                           StructField("shortf", ShortType()),\
                           StructField("intf", IntegerType()),\
                           StructField("longf", LongType()),\
                           StructField("floatf", FloatType()),\
                           StructField("doublef", DoubleType())])
    3. Set data.

      dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)])
    4. Create a DataFrame.

      dataFrame = sparkSession.createDataFrame(dataList, schema)
    5. Import data to HBase.

    6. Read data from HBase.

      // Set cross-source connection parameters
      TableName = "table_DupRowkey1"
      RowKey = "id:5,location:6,city:7"
      Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      ZKHost = ",,
      // select
      jdbcDF =\
      jdbcDF.filter("id = '12333' or id='11111'").show()


      The length of id, location, and city parameter is limited. When inserting data, you must set the data values based on the required length. Otherwise, an encoding format error occurs during query.

  • Submitting a Spark job

    1. Upload the Python code file to DLI.

    2. (Optional) Add the krb5.conf and user.keytab files to other dependency files of the job when creating a Spark job in an MRS cluster with Kerberos authentication enabled. Skip this step if Kerberos authentication is not enabled for the cluster.

    3. In the Spark job editor, select the corresponding dependency module and execute the Spark job.


      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.hbase when you submit a job.

      • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).



Complete Example Code

  • Connecting to MRS HBase through SQL APIs

    • Sample code when Kerberos authentication is disabled

      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
      from pyspark.sql import SparkSession
      if __name__ == "__main__":
        # Create a SparkSession session.
        sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
          "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
          'ZKHost' = '',\
          'TableName' = 'hbtest',\
          'RowKey' = 'id:5',\
          'Cols' = 'location:info.location,')")
        sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
        sparkSession.sql("select * from testhbase").show()
        # close session
    • Sample code when Kerberos authentication is enabled

      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark import SparkFiles
      from pyspark.sql import SparkSession
      import shutil
      import time
      import os
      if __name__ == "__main__":
          # Create a SparkSession session.
          sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate()
          sc = sparkSession.sparkContext
          krb5_startfile = SparkFiles.get("krb5.conf")
          keytab_startfile = SparkFiles.get("user.keytab")
          path_user = os.getcwd()
          krb5_endfile = path_user + "/" + "krb5.conf"
          keytab_endfile = path_user + "/" + "user.keytab"
          shutil.copy(krb5_startfile, krb5_endfile)
          shutil.copy(keytab_startfile, keytab_endfile)
            "CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +
            "using hbase OPTIONS(" +
            "'ZKHost'=''," +
            "'TableName'='hbtest'," +
            "'RowKey'='id:100'," +
            "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," +
            "'krb5conf'='" + path_user + "/krb5.conf'," +
            "'keytab'='" + path_user+ "/user.keytab'," +
            "'principal'='krbtest') ")
            sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
          sparkSession.sql("select * from testhbase").show()
          # close session
  • Connecting to HBase through DataFrame APIs

    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
    from pyspark.sql import SparkSession
    if __name__ == "__main__":
      # Create a SparkSession session.
      sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
      # Createa data table for DLI-associated ct
       "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \
        'ZKHost' = ',\
        'TableName' = 'table_DupRowkey1',\
        'RowKey' = 'id:5,location:6,city:7',\
        'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
      # Create a DataFrame and initialize the DataFrame data.
      dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)])
      # Setting schema
      schema = StructType([StructField("id", StringType()),
                           StructField("location", StringType()),
                           StructField("city", StringType()),
                           StructField("booleanf", BooleanType()),
                           StructField("shortf", ShortType()),
                           StructField("intf", IntegerType()),
                           StructField("longf", LongType()),
                           StructField("floatf", FloatType()),
                           StructField("doublef", DoubleType())])
      # Create a DataFrame from RDD and schema
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      # Write data to the cloudtable-hbase
      # Set cross-source connection parameters
      TableName = "table_DupRowkey1"
      RowKey = "id:5,location:6,city:7"
      Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      ZKHost = ",,
      # Read data on CloudTable-HBase
      jdbcDF =\
                           .option("ZKHost", ZKHost)\
                           .option("RowKey", RowKey)\
                           .option("Cols", Cols)\
      jdbcDF.filter("id = '12333' or id='11111'").show()
      # close session