半亩方塘

Stay foolish,Stay hungry


  • 首页

  • 标签

  • 分类

  • 归档

Cloudera Manager及CDH最新版本安装记录

发表于 2018-03-21 | 分类于 Cloudera Manager , CDH | 评论数: | 阅读次数:

大家都知道,Apache Hadoop的配置很繁琐,而且很零散,为此Cloudera公司提供了Clouder Manager工具,而且还封装了Apache Hadoop,flume,spark,hive,hbase等大数据产品形成自己特色的CDH产品,再使用CM进行安装,很大程度上方便了集群的搭建,并提供了集群的监控功能。

一、环境:

  1. 三台VMware虚拟机(一个做为主节点,两个做为从节点)
hserver1n(主节点) hserver2n(从节点) hserver3n(从节点)
CM Server
CM Agent CM Agent CM Agent
NameNode DateNode DateNode
Mysql
阅读全文 »

WordCount

发表于 2018-01-18 | 分类于 spark | 评论数: | 阅读次数:

在spark官网讲解spark streaming的时候,举了一个word count的例子,通过监听一个端口的TCP连接,统计单词的个数。程序如下(in scala):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.spark._
import org.apache.spark.streaming._

object NetworkWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
Logger.getRootLogger.setLevel(Level.ERROR)

val lines = ssc.socketTextStream("localhost", 9998)
val wordcount=lines.flatMap(_.split("\\W+"))
.map((_,1))
.reduceByKey(_+_)
wordcount.print()

ssc.start()
ssc.awaitTermination()

}
}

阅读全文 »

Spark方法aggregate讲解

发表于 2018-01-18 | 分类于 spark | 评论数: | 阅读次数:

首先看一下Spark官网对该方法的讲解:aggregate(RDD.scala)

大致的意思是aggregate接收两个函数,和一个初始化值。seqOp函数用于聚集每一个分区,combOp用于聚集所有分区聚集后的结果。每一个分区的聚集,和最后所有分区的聚集都需要初始化值的参与。
举例如下:

集群环境:一台Master,三台Worker,在spark-shell中测试

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val  seqOp:(Int,Int)=>Int={(a,b)=>{println("seqOp"+a+"\t"+b);math.min(a,b)}}
seqOp: (Int, Int) => Int = <function2>

scala> val combOp:(Int,Int)=>Int={(a,b)=>{println("combOp"+a+"\t"+b);a+b}}
combOp: (Int, Int) => Int = <function2>

scala> val z=sc.parallelize(List(1,2,3,4,5,6,7,8),2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> z.aggregate(3)(seqOp,combOp)
[Stage 13:> (0 + 0) / 2]combOp3 1
combOp 4 3
res13: Int = 7

阅读全文 »

Lock应用示例-生产者与消费者

发表于 2018-01-18 | 分类于 java | 评论数: | 阅读次数:

Java Lock的使用以及与synchronized的区别很多文章已经讲解的很清楚了,这里不再详细讲解,可以参考java Lock讲解.
总的来说Lock的功能比synchronized更强大,功能更多,但一般的线程同步业务synchronized已经能够满足,对于一些特殊的要求,比如要知道线程获取锁的结果,线程获取锁时没有获取到,要求等待一段时间后仍没获取到就不去获取了,去做别的事情或者等待锁的过程中能够响应中断等等。Lock接口中的newCondition()方法用于获取锁对象上绑定的实例,用于线程的等待与唤醒,可以获取多个实例,而不是像synchronized只有一个锁对象(此时锁对象既是锁对象也是实例),好处是线程可以使用不同的实例,进行有针对的等待与唤醒,Lock接口唯一的实现类是ReentrantLock,下面我使用这个类来实现经典的生产者与消费者。

阅读全文 »

Java多线程应用示例-抽奖

发表于 2017-12-02 | 分类于 java | 评论数: | 阅读次数:

这里用java多线程实现一个简单的抽奖程序,从100个座位号中,随机抽取10位。
我设计的思路是用10个线程,每个线程从100个座位号中随机抽取1位,然后显示在黑板上,然后再去抽取,再显示。这里涉及到从100 个座位号中随机抽取时要线程之间要同步,否则会出现同一个号被多个线程抽取。一个线程大致分为如下几个步骤:
a.从待抽取的座位列表中随机抽取座位。
b.将抽取的座位号显示在黑板对应的位置上,并取得那个位置上当前显示的座位号。
c.将b步得到的座位号添加到待抽取列表中。

经过a,b,c三步,一个线程一次的动作算完成了,将这三步操作视为一个原子操作,进行线程间同步,剩下的就是循环抽取座位号就行了。

有几点需求注意:
1.b步可以不同步,但发现,当随机抽取的数量多时,此时一个线程(假设代号为1)还没有来得及更新黑板上对应位置的座位号,另一个线程(假设代号为2)已经将1线程放入座位组中的号又取出来,并显示在黑板上了。这样就会造成黑板上同一个座位号显示了多次。
2.需要区分哪些座位号是抽取过的,哪些座位号是没有抽取过的,这样每次抽取只抽取没有抽取过的。会自然而然的用两个List表示。但仔细想想,不需要用两个List区分,因为黑板上显示的座位号就是已经抽取的,只需要每次从座位池中删除,然后显示在黑板上,再次抽取时,将黑板上原来显示的座位号放回座位池中即可。
3.是否可以用LinkedList代替ArrayList。大家都知道LinkedList的删除,插入性能优于ArrayList,并且这里线程也需要反复不断的对座位池进行删除,新增操作。但这里的新增只是在最后面加入,影响不大,有影响的就是删除操作。但是这样还会有不断的随机取号操作。到底是删除操作影响大,还是取号操作影响大呢?经过验证发现,当座位池中座位号比较大时(假如为10万),LinkedList速度会明显慢于ArrayList,几乎有10倍之差。

阅读全文 »

Hive中自定义Map/Reduce示例 In Python

发表于 2017-11-27 | 分类于 hive | 评论数: | 阅读次数:

Hive支持自定义map与reduce script。接下来我用一个简单的wordcount例子加以说明。使用Python开发(如果使用Java开发,请看这里)。

1
2
3
4
开发环境:
python:2.7.5
hive:2.3.0
hadoop:2.8.1

###一、map与reduce脚本
map脚本(mapper.py)

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/python
import sys
import re
while True:
line = sys.stdin.readline().strip()
if not line:
break
p = re.compile(r'\W+')
words=p.split(line)
#write the tuples to stdout
for word in words:
print '%s\t%s' % (word, "1")

阅读全文 »

Hive中自定义Map/Reduce示例 In Java

发表于 2017-11-24 | 分类于 hive | 评论数: | 阅读次数:

Hive支持自定义map与reduce script。接下来我用一个简单的wordcount例子加以说明。
如果自己使用Java开发,需要处理System.in,System,out以及key/value的各种逻辑,比较麻烦。有人开发了一个小框架,可以让我们使用与Hadoop中map与reduce相似的写法,只关注map与reduce即可。如今此框架已经集成在Hive中,就是$HIVE_HOME/lib/hive-contrib-2.3.0.jar,hive版本不同,对应的contrib名字可能不同。

阅读全文 »

FileSystem关键几个方法的时序图

发表于 2017-09-11 | 分类于 hadoop | 评论数: | 阅读次数:

Hadoop将底层文件系统抽象成FileSystem类,上层用户可以通过相同方法操作底层不同的文件系统。常用的方法有get一个FileSystem实例,open打开一个文件的输入流,然后通过输入流的read方法读取数据。下面简单地将这三个方法时序流程中主要环节以时序图形式表示(以文件系统为HDFS为例),以便读者有所了解。

  • get方法获得FileSystem实例
    FileSystem抽象类通过读取配置文件”core-site.xml”中属性”fs.default.name”的值,判断使用哪个文件系统,然后通过反射生成文件系统实例,后调用DistributedFileSystem(FileSystem的继承实现类)的initialize方法获取NameNode的代理客户端类。

get时序图.jpg

阅读全文 »

第5章:Hadoop I/O

发表于 2017-09-04 | 分类于 hadoop | 评论数: | 阅读次数:

Hadoop有一些数据I/O方面操作的工具,其中一些比Hadoop使用的都更普遍。例如数据完整性和压缩。但是当使用这些工具处理多达几TB数据的时候,仍然需要特别注意。其它的一些工具或APIs构建成了模块用于开发分布式系统,例如:序列化框架和硬盘上的数据结构化。

数据完整性

Hadoop的使用者肯定都希望数据在存储或处理的过程中不会丢失或损坏。然而,由于硬盘或网络的每一次I/O操作都有可能在读或写的过程中出错,Hadoop经常处理的数据量都相当大,当这样地大量数据在系统中传输时,数据损坏的概率还是挺高的。

通常用于判断数据是否损坏所使用的方法是当数据第一次进入系统时候计算一个校验和,并不管在任何时候,数据被传输通过一个不可信的通道后,再一次计算校验和,就可以判断数据是否损坏。如果新生成的校验和与原始的校验和不一样,就认为数据遭到了损坏。这个技术不能够修复数据,仅仅是检查数据是否损坏。(这也是不使用低档的硬件的原因,特殊情况下,肯定要使用ECC内存)。注意:有时校验和也可能损坏,而数据没有损坏,但这种可能性很低,因为校验和比数据小多了。

通常使用的错误检测码是CRC-32(32位循环冗余校验),不管输入的数据量多大,都计算出一个32的整数校验和。CRC-32在Hadoop的ChecksumFileSystem类中用于计算校验和。然而HDFS使用的是另一种更有效的方式,叫做CRC-32C。

阅读全文 »

第4章:YARN

发表于 2017-08-24 | 分类于 hadoop | 评论数: | 阅读次数:

Apache YARN(Yet Another Resource Negotiator)是一个Hadoop集群资源管理系统。YARN是在Hadoop 2引入的,用以改善MapReduce的表现。但是它也足够胜任其它的分布式计算框架。

YARN提供了一些能被请求调用的APIs,并处理集群资源。但是通常用户不会直接调用这些APIs,而是调用由分布计算框架提供的更高级别的APIs。这些更高级别的APIs基于YARN建立,并对用户隐藏了资源管理的细节。图4-1说明了这个情景,并显示了一些分布式计算框架(MapReduce,Spark等等)作为YARN应用位于集群计算层(YARN)和存储层之上(HDFS和Hbase)。
图4-1:YARN应用
图4-1框架表示的应用层之上还有一层应用,如Pig,Hive和Crunch。它们都运行在MapReduce,Spark或Tez(或同时三者)之上,并不和YARN直接交互。

这一章节带领大家过一遍YARN的特性,为理解第四部分章节的内容(包括一些分布式处理框架)打下基础。

阅读全文 »
123
Mr.Yang

Mr.Yang

23 日志
10 分类
25 标签
GitHub E-Mail
© 2017 — 2019 Mr.Yang
由 Hexo 强力驱动 v3.7.1
|
主题 — NexT.Gemini v6.3.0