基本框架

实时数据输入基本上来自Flume的数据采集,通过对应topic存入kafka之中,Spark streaming实时消费对应topic的内容并进行数据分析。分析结果的处理分很多种情况,可以存入kafka,写入HDFS或本地文件以供后续使用,也可以直接将结果展示到页面上,或发送邮件,短信通知。

为什么使用Kafka

框架中在Flume实时数据和Spark Streaming中间加上了一层Kafka,实际上Flume是可以直接和Spark streaming对接,那么为什么要加上Kafka呢。主要原因如下:

  • 可靠性: Flume的file channel虽然提供了一定的可靠性,但是file channel其实相当于一个队列,并且容量不是特别大,当spark streaming当掉或sink因为其它原因无法发送数据的时候,file channel积累的数据很可能就会超出限制,导致数据丢失。而Kafka支持PB级的数据存储,在容量上面不用担心。并且Kafka是多节点运行,冗余存储,基本上很难挂掉。
  • 低延迟: Kafka和Flume集成可以达到次秒级延时,基本上可以满足需求。
  • 扩展性: 使用Kafka可以方便的与其它业务的需求对接(主要在业务端进行Coding,不用改变该框架的基本内容。如果只使用Flume,则需要改动Flume配置,存在一定风险)。

快速搭建Kafka环境

Kafka quick start

搭建Spark Streaming环境

1. 下载spark binary包

spark-1.5.0-bin-hadoop2.6.tgz

2. 启动Standalone master server

./sbin/start-master.sh

3. 启动Worker

./sbin/start-slave.sh <master-spark-URL>

通过master server的web UI(默认是http://localhost:8080)可以查看详情。

Spark Streaming 示例

官方示例可查看 git_KafkaWordCount

运行以下命令可以检测spark streming和kafka是否能集成使用

./bin/run-example streaming.DirectKafkaWordCount <kafka broker list[host:port]> <topic>
例如:
[localhost@hadoop002 kafka]$ ./bin/run-example streaming.DirectKafkaWordCount localhost:9092,localhost:9093 kafka-test

打开kafka产生端输入数据

[localhost@hadoop002 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-test
hello world
how are you

可以看见spark streaming输出

[localhost@hadoop002 spark-1.4.1-bin-hadoop2.6]$ ./bin/run-example streaming.DirectKafkaWordCount localhost:9092,localhost:9093 kafka-test
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/10/23 17:17:35 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
15/10/23 17:17:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1445591924000 ms
-------------------------------------------

-------------------------------------------
Time: 1445591926000 ms
-------------------------------------------
(hello,1)
(world,1)

-------------------------------------------
Time: 1445591928000 ms
-------------------------------------------
(how,1)
(are,1)
(you,1)

说明搭建好的环境可以正常运行

代码编写

初始化上下文

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val streamingContext = new StreamingContext(sparkConf, Seconds(1))

Consumer配置

val topics = "kafka-test"
val topicsSet = topics.split(",").toSet
val brokers = "10.1.2.52:9092,10.1.2.52:9093"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)

数据分析

这里为word count

val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

producer 配置

    //producer config
    val props = new Properties()
    props.put("metadata.broker.list", "10.1.2.52:9092")
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("key.serializer.class", "kafka.serializer.StringEncoder")
    props.put("request.required.acks", "1")
    val topic = "producer-test"
    val ip = "10.1.2.52"

    def func = (rdd: RDD[(String, Long)]) => {
      val broadcastedConfig = rdd.sparkContext.broadcast(props)
      rdd.foreachPartition { partitionOfRecords =>
        partitionOfRecords.foreach(r => {
          val producer = new Producer[String, String](new ProducerConfig(broadcastedConfig.value))
          producer.send(new KeyedMessage[String, String]("producer-test", r.toString()))
        })
      }
    }
    wordCounts.foreachRDD(func)

在开始之前

这里使用 Web server + Executor 的模式,不使用Solo server 下载地址

环境搭建

配置数据库

注:目前Azkaban2仅支持MySQL作为数据存储仓库。

  1. 安装MySQL

可参考: MySQL Documentation Site

  1. 配置数据库

为Azkaban创建一个数据库,如:

mysql> CREATE DATABASE azkaban;

为Azkaban创建一个数据库用户,如:

mysql> CREATE USER '<username>'@'localhost' IDENTIFIED BY '<password>';

配置用户权限,为创建的用户添加对于Azkaban数据库的INSERT, SELECT, UPDATE, DELETE权限。

mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON <database>.* to '<username>'@'%' WITH GRANT OPTION;

(可选)配置MySQL max_allowed_packet属性,提高Packet Size,目前尚未发现未配置有何不良影响。

修改/etc/my.cnf,如下:

[mysqld]
...
max_allowed_packet=1024M

修改之后重启MySQL(数据库不方便重启的可以暂时跳过这一步)

$ sudo service mysqld restart
  1. 创建Azkaban表

下载页面下载 azkaban-sql-script 包,在新建的Azkaban数据库里面运行create-all-sql脚本即可。忽略update开头的脚本。

  1. 获取JDBC Connector Jar包

由于某些原因,Azkaban不提供这个包,我们需要自己下载然后放到Web Server以及Executor Server的extlib目录下。MySQL JDBC connector jar

启动Azkaban Web Server

将下载的Web Server包解压,进入Web Server目录下。

生成KeyStore

执行以下命令,根据指示输入即可。

$ keytool -keystore keystore -alias jetty -genkey -keyalg RSA
配置
  1. azkaban.properties中配置keyStroe相关参数,例如:
jetty.keystore=keystore
jetty.password=password
jetty.keypassword=password
jetty.truststore=keystore
jetty.trustpassword=password
  1. azkaban.properties中配置mysql相关参数,例如:
database.type=mysql
mysql.port=3306
mysql.host=localhost
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
  1. azkaban.properties中配置UserManager相关参数,例如:

UserManager提供了用户认证和用户角色信息。根据以下配置,Azkaban会使用XmlUserManager获取azkaban-users.xml用的帐号/密码和角色信息。

user.manager.class=azkaban.user.XmlUserManager
user.manager.xml.file=conf/azkaban-users.xml
运行Web Server

确保azkaban.properties中的配置如下:

jetty.maxThreads=25
jetty.ssl.port=8443

执行bin/start-web.sh启动web server 执行bin/azkaban-web-shutdown.sh停止

启动Azkaban Executor Server

#####配置

azkaban.properties中配置mysql相关参数,例如:

database.type=mysql
mysql.port=3306
mysql.host=localhost
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
运行Executor Server

确保azkaban.properties中的配置如下:

# Azkaban Executor settings
executor.maxThreads=50
executor.port=12321
executor.flow.threads=30

执行bin/start-exec.sh启动executor server 执行bin/azkaban-exec-shutdown.sh停止

使用Azkaban

设置邮件提醒

配置发送端 在Web Server azkaban.properties 中将如下配置填充。

# mail settings
mail.sender=
mail.host=
mail.user=
mail.password=

接收端有两种配置方法,一种是在job的配置文件中配置,例如:

# hello 

type=command
command=echo "Hello "
command.1= sh hello.sh
success.emails=
failure.emails=
notify.emails=

另一种是在执行Job的时候重写邮箱列表,例如:

{F423}

工作流依赖关系配置

例如:

hello.job

# hello 

type=command
command=echo "Hello "
command.1= sh hello.sh
failure.emails=
retries=3
retry.backoff=1000

world.job

# world.job

type=command
dependencies=hello
command=echo "world!"

world2.job

# world.job

type=command
dependencies=hello
command=echo "world2!"

end.job

# world.job

type=command
dependencies=world,world2
command=echo "world2!"

打包上传可以查看工作流信息如下: {F425}

Job Types使用介绍

可参考: Jobtypes

简介

基于全表导出和全表导入。流程为Source Hive -> Source HDFS Cluster -> Destination HDFS Cluster -> Destination Hive

(Source端)群集操作

1. 创建导出临时目录

这里定为hdfs://tmp/hive-export/ 假设这里导出的数据库名为cdp_data

$ sudo -u hdfs dfs -mkdir -p /tmp/hive-export/cdp_data

2. 生成导出数据脚本

$ sudo -u hdfs hive -e "use cdp_data; show tables;" | \
awk '{printf "export table %s to @/tmp/hive-export/cdp_data%s@;\n",$1,$1}' | \
sed "s/@/'/g" > export.hql

3. 执行导出数据脚本

$ sudo -u hdfs hive -e "use cdp_data; source export.hql"

4. 数据导出完成

(Destination端)群集操作

1 创建导入临时目录

这里定为hdfs://tmp/hive-import/

$ sudo -u hdfs dfs -mkdir -p /tmp/hive-import/cdp_data

2. 从Source端复制导出到HDFS的数据

这里用DistCp,该步只能在Destination端进行。并且需要用hftp连接Source端的hdfs文件系统。这是为了避免因Cluster版本不同产生的问题。

$ sudo -u hdfs hadoop distcp hftp://<source host>:50070/tmp/hive-export/cdp_data \
hdfs://<destination host>:8020/tmp/hive-import/cdp_data

3. 生成导入数据脚本

$ sudo -u hdfs hdfs dfs -ls /tmp/hive-import/cdp_data/ | \
awk '{print $8}' | awk -F '/' '{print $5}' | grep -v "^$"  > table.list

$ cat table.list  | \
awk '{printf "import table %s from @/tmp/hive-import/cdp_data/%s@;\n",$1,$1}' | \
sed "s/@/'/g" > import.hql

4. 执行导入数据脚本

$ sudo -u hdfs hive -e "use cdp_data; source import.hql"

简介


Kerberoso为一种计算机网络认证协议,它允许某实体在非安全网络环境下通信,向另一个实体以一种安全的方式证明自己的身份。这里指由麻省理工实现此协议,并发布的一套免费软件。Kerberos协议基于对称密码学。Kerberos工作在用于证明用户身份的”票据(tickets)”的基础上。KDC持有一个密钥(principals)数据库;每个网络实体——无论是客户还是服务器——共享一套只有他自己和KDC知道的密钥。密钥的内容用于证明实体的身份。对于两个实体间的通信,KDC产生一个会话密钥,用来加密他们之间的交互信息。

[[ https://en.wikipedia.org/wiki/Kerberos_(protocol) Kerberos Wiki ]]

系统环境


  • 系统://CentOS 6.3 x64// X 2

  • Cloudera Manager://5.4.4//

  • CDH: //5.4.4//

  • JDK: //1.7.0_79//

  • 群集节点规划如下

IP Hostname Roles
10.1.2.51 hadoop001 NameNode, DataNode, Kerberos client
10.1.2.52 hadoop002 DataNode, Kerberos Server, Kerberos client

注意:hostname 需使用小写,以避免不必要的错误。

前置条件


1. 安装Cloudera Management和CDH:

可参考 通过Cloudera Management安装CDH中的Hadoop/Hive/Spark

2. 配置Kerberos服务:

以下操作使用root权限进行。

1) 安装 Kerberos Server:

在hadoop002上安装Kerberos Server:

[root@hadoop002 ~]# yum install krb5-server -y

2) 安装 Kerberos Client:

在hadoop001和hadoop002上安装Kerberos Client:

[root@hadoop001 ~]# yum install krb5-workstation krb5-libs -y

[root@hadoop002 ~]# yum install krb5-workstation krb5-libs -y

3) 编辑 Kerberos Server 配置文件(kdc.conf):

以下仅列出需要的配置,详细配置参考:[[ http://web.mit.edu/~kerberos/krb5-devel/doc/admin/conf_files/kdc_conf.html kdc.conf ]]:
[root@hadoop002 ~]# vim /var/kerberos/krb5kdc/kdc.conf

[kdcdefaults]

 kdc_ports = 88

 kdc_tcp_ports = 88


[realms]

 CDP.COM = {

  #master_key_type = aes256-cts

  master_key_type = aes128-cts

  acl_file = /var/kerberos/krb5kdc/kadm5.acl

  dict_file = /usr/share/dict/words

  admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab

  max_life = 1d

  max_renewable_life = 7d

  supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal

  default_principal_flags = +renewable, +forwardable

 }

说明:

  • CDP.COM :设定的realms, 一般使用大写。

  • master_key_typesupported_enctypes : 默认使用aes256-cts,为了简便,这里不使用 aes256-cts 算法(原因见下面说明)。

  • max_renewable_life = 7dmax_life = 1ddefault_principal_flags = +renewable, +forwardable : Cloudera Manager要求配置凭证生命周期非零且可更新。

关于AES-256加密:

如果你使用的是CentOS或Red Hat Enterprise Linux 5.5及以上的系统,则默认使用AES-256加密Kerberos的tickts。需要给群集里的所有节点安装[[ http://www.oracle.com/technetwork/java/javase/downloads/index.html Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy File ]]。
这里使用的是JDK1.7,对应的JCE下载地址[[ http://www.oracle.com/technetwork/java/embedded/embedded-se/downloads/jce-7-download-432124.html Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files for JDK/JRE 7 ]] 。下载完后将zip包内的两个文件解压到 $JAVA_HOME/jre/lib/security 即可。

4) 修改 Kerberos Client 配置文件(krb5.conf):

[root@hadoop002 ~]# vim /etc/krb5.conf

[logging]

 default = FILE:/DATA/log/krb5libs.log

 kdc = FILE:/DATA/log/krb5kdc.log

 admin_server = FILE:/DATA/log/kadmind.log


[libdefaults]

 default_realm = CDP.COM

 dns_lookup_realm = false

 dns_lookup_kdc = false

 ticket_lifetime = 24h

 renew_lifetime = 7d

 forwardable = true

 clockskew = 120

 udp_preference_limit = 1


[realms]

 CDP.COM = {

  kdc = hadoop002

  admin_server = hadoop002

 }


[domain_realm]

 .cdp.com = CDP.COM

 cdp.com = CDP.COM


说明:

  • 基本上是默认配置,修改一些hostname和realms配置即可。

  • clockskew = 120 :表示tickets与服务器的时钟偏差可容忍在120秒之内。

  • udp_preference_limit = 1 : 据说可以避免一个Hadoop的错误。

5) 编辑 访问控制列表文件 (kadm5.acl)

该文件包含所有获许管理 KDC 的主体名称。

[root@hadoop002 ~]# cat /var/kerberos/krb5kdc/kadm5.acl

*/admin@CDP.COM *

6) 同步配置文件

将 hadoop002 中的 /etc/krb5.conf 拷贝到其他主机(即hadoop001)

[root@hadoop001 ~]# scp hadoop002:/etc/krb5.conf /etc/krb5.conf

7) 创建数据库

该数据库用于存储principals。其中 -r 指定对应 realm。用 -d 可指定数据库名字,默认为principal。

[root@hadoop002 ~]#  kdb5_util create -r CDP.COM -s

如果提示数据库已经存在,则要把 /var/kerberos/krb5kdc/ 目录下的 principal(数据库名称) 的相关文件都干掉。

8) 启动服务

在 hadoop002 节点上运行:

[root@hadoop002 ~]# chkconfig --level 35 krb5kdc on

[root@hadoop002 ~]# chkconfig --level 35 kadmin on

[root@hadoop002 ~]# service krb5kdc start

[root@hadoop002 ~]# service kadmin start

9) 创建 kerberos 管理员

关于 kerberos 的管理,可以使用 kadmin.local 或 kadmin,区别如下:

  • 本地机器(即kerberos server所在主机)使用 kadmin.local。不需要密码。
[root@hadoop002 ~]# kadmin.local

Authenticating as principal root/admin@CDP.COM with password.

kadmin:

  • 远端机器(即kerberos client)使用 kadmin 远程连接kerberos server,需要有管理员权限的principal。需要认证。
[root@hadoop001 ~]# kadmin

Authenticating as principal cloudera-scm/admin@CDP.COM with password.

Password for cloudera-scm/admin@CDP.COM:

kadmin:

所以这里我们需要创建一个远程管理员用以远端登陆。同时Cloudera Manager也会用到。

[root@hadoop001 ~]# kadmin.local -q "addprinc cloudera-scm/admin"

系统会提示输入密码,密码不能为空。

10) Kerberos 相关操作

至此kerberos服务搭建完成,可以用以下几个指令测试是否搭建成功:

#查看principals

kadmin.local: list_principals


#添加principal

kadmin.local:  addprinc test

WARNING: no policy specified for test@CDP.COM; defaulting to no policy

Enter password for principal "test@CDP.COM":

Re-enter password for principal "test@CDP.COM":

Principal "test@CDP.COM" created.


#删除principal

kadmin.local:  delete_principal test

Are you sure you want to delete the principal "test@CDP.COM"? (yes/no): yes

Principal "test@CDP.COM" deleted.

Make sure that you have removed this principal from all ACLs before reusing.

[root@hadoop001 ~]# kadmin.local -q "addprinc root/admin" //查看principals。

#获得ticket

[root@hadoop002 ~]# kinit test

Password for test@CDP.COM:


#查看持有的ticket

[root@hadoop002 ~]# klist -e

Ticket cache: FILE:/tmp/krb5cc_0

Default principal: test@CDP.COM


Valid starting     Expires            Service principal

09/10/15 17:53:55  09/11/15 17:53:55  krbtgt/CDP.COM@CDP.COM

        renew until 09/10/15 17:53:55, Etype (skey, tkt): aes128-cts-hmac-sha1-96, aes128-cts-hmac-sha1-96


#销毁持有的ticket

[root@hadoop002 ~]# kdestroy

[root@hadoop002 ~]# klist

klist: No credentials cache found (ticket cache FILE:/tmp/krb5cc_0)

在Cloudera Manager上启动Kerberos


Important:

  1. If you have enabled YARN Resource Manager HA in your non-secure cluster, you should clear the StateStore znode in ZooKeeper before enabling Kerberos. To do this:
  1. Go to the Cloudera Manager Admin Console home page, click to the right of the YARN service and select Stop.
  1. When you see a Finished status, the service has stopped.
  1. Go to the YARN service and select Actions > Format State Store.
  1. When the command completes, click Close.
  1. 进入Cloudera Manager

  2. Administratio > Kerberos

  3. 点击Enable Kerberos

首先

{F332}

有四个前置条件

  1. 有正在运行的KDC(即Kerberos Server),前置条件已经搭建。

  2. KDC配置为非零生命周期且可更新,前置条件已经配置。

  3. OpenLdap,这里没有用到。

  4. 已创建远端登陆管理员帐户。

下一步1

{F334}

都很明显

下一步2

{F336}

在搭建KDC时已经手动同步了krb5.conf,因此这里不打勾。

实际上在这里扔给Cloudera Manager管理会好一点。

下一步3

{F338}

远端principal的名称和密码

下一步4

{F340}

导入principal

下一步5

{F342}

配置Cloudera Manager为群集中的服务创建的Principals的名称

下一步6

{F344}

默认配置即可

下一步7

{F346}

这一步由于之前在测试的时候创建了同样的principals没有删除而出错了。只要确认KDC里面没有群集服务要使用的principals名称就不会出错。

查看principals

{F348}

到这里基本上就算完工了。接下来就是使用hdfs,跑mapreduce测试权限认证有没有正常运作。

注:运行MR2时,要清空yarn/nm/usercache下的用户缓存文件。在SIMPLE模式下和在KERBORES模式下建立的文件权限不一样。

验证权限认证系统正常运行

认证当前用户并获得ticket,principal格式为: username@YOUR-REALM.COM(需提前在KDC添加principal)

[localhost@hadoop001 ~]$ kinit

Password for localhost@CDP.COM:

[localhost@hadoop001 ~]$ klist

Ticket cache: FILE:/tmp/krb5cc_500

Default principal: localhost@CDP.COM


Valid starting     Expires            Service principal

09/10/15 19:24:51  09/11/15 19:24:51  krbtgt/CDP.COM@CDP.COM

        renew until 09/10/15 19:24:51


#### 运行hadoop mapreduce任务


结果

[localhost@hadoop001 ~]$ hadoop jar /DATA/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/jars/hadoop-examples.jar pi 10 10000

Number of Maps  = 10

Samples per Map = 10000

...

Job Finished in 49.05 seconds

Estimated value of Pi is 3.14120000000000000000

运行hive beeline

[localhost@hadoop001 ~]$ beeline

Beeline version 1.1.0-cdh5.4.4 by Apache Hive

beeline> !connect jdbc:hive2://localhost:10000/;principal=hive/hadoop001@CDP.COM

scan complete in 5ms

Connecting to jdbc:hive2://localhost:10000/;principal=hive/hadoop001@CDP.COM

Enter username for jdbc:hive2://localhost:10000/;principal=hive/hadoop001@CDP.COM:

Enter password for jdbc:hive2://localhost:10000/;principal=hive/hadoop001@CDP.COM:

Connected to: Apache Hive (version 1.1.0-cdh5.4.4)

Driver: Hive JDBC (version 1.1.0-cdh5.4.4)

Transaction isolation: TRANSACTION_REPEATABLE_READ

0: jdbc:hive2://localhost:10000/> show databases;

+-----------------+--+

|  database_name  |

+-----------------+--+

| default         |

| hive_auth_test  |

+-----------------+--+

4 rows selected (3.639 seconds)

0: jdbc:hive2://localhost:10000/> select * from hive_auth_test.t1;

+--------+--+

| t1.id  |

+--------+--+

| 0      |

| 1      |

+--------+--+

2 rows selected (1.635 seconds)

0: jdbc:hive2://localhost:10000/>


(1) ReliableSpoolingFileEventReader


下面代码块使用一个名为canary的文件。canary: 金丝雀。 金丝雀曾在矿井中被用于早期预警,这段代码的意义也在于此。

  1. 创建一个canary文件: 检测创建功能。
  2. 然后写。再读。Check是否读出内容: 检测读写功能。
  3. 最后删除canary文件: 检测删除功能。 预先检测在Spooling directory内的所有操作能否成功。
// Do a canary test to make sure we have access to spooling directory
try {
  File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
      spoolDirectory);
  Files.write("testing flume file permissions\n", canary, Charsets.UTF_8);
  List<String> lines = Files.readLines(canary, Charsets.UTF_8);
  Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
  if (!canary.delete()) {
    throw new IOException("Unable to delete canary file " + canary);
  }
  logger.debug("Successfully created and deleted canary file: {}", canary);P
} catch (IOException e) {
  throw new FlumeException("Unable to read and modify files" +
      " in the spooling directory: " + spoolDirectory, e);
}

这块代码位于上面那块之下。 trackerDirPath传自上一级的SpoolDirectorySource类。默认值为 “.flumespool” if条件块一:File.isAbsolute() : 判断路径是否为绝对路径。 因为下面使用的是new File(parent, child)。 if条件块二:若trackerDirectory不存在则自动创建该目录。 if条件块三:确定当前trackerDirectory为目录。 if条件块四:删除旧的this.metaFile

File trackerDirectory = new File(trackerDirPath);

// if relative path, treat as relative to spool directory
if (!trackerDirectory.isAbsolute()) {
  trackerDirectory = new File(spoolDirectory, trackerDirPath);
}

// ensure that meta directory exists
if (!trackerDirectory.exists()) {
  if (!trackerDirectory.mkdir()) {
    throw new IOException("Unable to mkdir nonexistent meta directory " +
        trackerDirectory);
  }
}

// ensure that the meta directory is a directory
if (!trackerDirectory.isDirectory()) {
  throw new IOException("Specified meta directory is not a directory" +
      trackerDirectory);
}

this.metaFile = new File(trackerDirectory, metaFileName);
if(metaFile.exists() && metaFile.length() == 0) {
  deleteMetaFile();
}
private void deleteMetaFile() throws IOException {
    if(this.metaFile.exists() && !this.metaFile.delete()) {
        throw new IOException("Unable to delete old meta file " + this.metaFile);
    }
}