JVC录制视频,连接电脑后文件不是MOD格式,是PIG格式,饥荒mod警告怎么办办,饥荒mod警告怎么办把PIG转换为MOD?

当前访客身份:游客 [
博闻强识,格物致知.自由思想,独立人格.
:认真看懂后,还是挺实用
:Changed in version 2.6: MongoDB introduces a ...
:你好,请问数据结构的贴图里你用的是什么字体?
:Job无限运行,怎么一次就停止啊
:收藏先,再消化。
:引用来自“沈”的评论多个频道怎么弄啊使用 Patt...
:多个频道怎么弄啊
:&entry key-ref="messageListener"& &bean class...
:懒人镜像包,一键构造MongoDB集群分片,一键初始化...
:你好,如你的数据结构,如果要根据水果的Id查询出...
今日访问:55
昨日访问:124
本周访问:321
本月访问:2329
所有访问:64732
Pig数据模型及Order,Limit关系操作
发表于2年前( 20:32)&&
阅读(2786)&|&评论()
0人收藏此文章,
上一篇博客主要讲解了Pig的安装和一个试手的例子,下面说一下Pig的数据模型。
Pig的数据模型基本分为2大类,基本类型,和复杂类型。基本类型只能包含一个简单的数值,复杂类型可以包含其他所有类型。
基本类型,和其他大多数主流语言提供的简单类型一样,Pig也提供int, long, float, double,chararray(Sring),bytearray(字节类型)
Pig是用Java开发的,因此Java程序员应该能想象的到,上面的基本类型都是java.lang包中对应类型的实现的。
需要注意的是,Pig的基本类型中没有Boolean值,但是在Pig的运行期却是有Boolean值的,因为关系Filter需要一个Boolean关系为true时数据才能流过去。这点上需要注意。
复杂类型包括Map, Tuple和Bag
Map相当于Java的Map&String, Object&类型的。Key必须是一个chararray,而值可以是任何基本和复杂类型。表示为[key#value]
Tuple可以理解为Java中的List,其实如果懂得Python,它更像Python中的Tuple[元组],表示为:(1, “abc”, 1.5)等
Bag的数据类型按我的理解为Java的Set&Tuple&,它内部的数据是无序的。表示为:{(1, “abc”, 1.5),(2, “bcd”, 2.0)...}
紧接着上一篇统计单词的例子,下面我做一些改写。一步一步的讲解Pig的运行数据状态。
统计单词的第一步就是要把文本文件读入,然后分词。
words = load 'nie.txt' using PigStorage(' ') as (line);
加载数据统一用load, 使用了PigStorage加载文件,Pig
Storage默认是使用Tab分割文件中的内容的,因为我的例子是一片英文文章,因此只需要用空格做为分隔符就好 了。as (line)是在为数据指定模式,所有的数据加载进Pig都是Tuple类型的,因此使用(line),或者(line:chararray)。Tuple也可以指定多个值的,如(line:chararray,num1:int, num2:double),多个值的Tuple必须要数据要能转换成这样的,否则就会出错,或者加载了其他不对应的值也是空值,这样也没有意义。
Pig Latin脚本的任何地方都可以输出结果查看,我们先看一下words的结果。
输出的结果:
(extravagant,)
(unfathomable)
(self-tyranny--Nature)
我截取了一段结果,可以看到他们的数据都是Tuple,里边只有一个值。
因此如果为了统计词频,下一步应该是用Mapreduce的Map把相同的词聚集到一起,然后统计。下面的这句代码就很轻松的实现。
紧接着,我们输出grpd()的结果查看。
(different,{(different),(different)})
(extremely,{(extremely)})
(mistaken,,{(mistaken,)})
(naturally,{(naturally)})
(ourselves,{(ourselves),(ourselves)})
(permitted,{(permitted)})
(something,{(something),(something)})
熟悉MapReduce应该很清楚了,他把相同的词合并到了一起,组成了一个Tuple(word, {word, word})的数据格式。Reduce再把
{word, word
}遍历一边得出一个count就实现了统计词频的效果了。
完整的程序:
words = load 'nie.txt' using PigStorage(' ') as (line); --以空格作为分隔符把内容分词读入
--以每个单词聚类,真实的是一个MapReduce的Map阶段
cntd = foreach grpd generate group, COUNT(words); -- 这里既上一步Map完成紧接着Reduce阶段进行统计
--直接在终端打印结果便于查看
上面的程序正常运行下来应该能统计出词了该词出现的次数,文本很长的情况下着实没有什么意思,一般都会看前面高频的一些词汇,因此还应该做个Order By,然后取出前面的数据。幸好Pig是支持的。
cous = order cntd by $1
limitresult = limit cous 50;
接着上面的dump前,继续排序,然后截取前面50条结果。
应该能看到,程序中使用了一个$1的的变量,因为foreach总会创建一个Tuple,而在COUNT()没有特别的指出别名,因此应该是匿名的,匿名的变量可以用$取出值,下标总是从0开始。Pig Latin是可以用as 强调别名的,这和SQL的AS是一样的。
运行该程序用的时间明显长了很多。运行结果如:
现在的结果明显有意义了很多。我在第一次运行这个程序的时候,还抛出了OutofMemory的异常,因此还更改了Pig的配置文件。PigHome/conf/pig.properties.在后边增加这样的配置:
pig.cachedbag.memusage=0
io.sort.mb=40
io.sort.record.percent=0.1
Pig排序时需要很大的内存,通过参数可以把部分数据临时序列化到硬盘,具体的意义可以上Google搜。
以前玩Hadoop的朋友,是不是觉得Pig能简化一些Mapreduce的开发呢?&
更多开发者职位上
1)">1)">1" ng-class="{current:{{currentPage==page}}}" ng-repeat="page in pages"><li class='page' ng-if="(endIndex<li class='page next' ng-if="(currentPage
相关文章阅读posts - 35,&
comments - 1,&
trackbacks - 0
本文来自与作者阅读&所做的笔记,转载请注明出处&
。Pig Latin是一种数据流语言,变量的命名规则同java中变量的命名规则,变量名可以复用(不建议这样做,这种情况下相当与新建一个变量,同时删除原来的变量)
A = load 'NYSE_dividends' (exchange, symbol, date, dividends);
A = filter A by dividends & 0;
A = foreach A generate UPPER(symbol);
。注释:--单行注释;/*&&*/多行注释;
。Pig Latin关键词不区分大小写,比如load,foreach,但是变量名和udf区分大小写,COUNT是udf,所以不同于count。
。Load 加载数据
默认加载当前用户的home目录(/users/yourlogin),可以在grunt下输入cd 命令更改当前所在目录。
divs = load '/data/examples/NYSE_dividends'
也可以输入完整的文件名
divs = load &hdfs:///data/examples/NYSE_dividends&
默认使用TAB(\t)作为分割符,也可以使用using定义其它的分割符
divs = load 'NYSE_dividends' using PigStorage(',');
注意:只能用一个字符作为分割符
还可以使用using定义其它的加载函数
divs = load 'NYSE_dividends' using HBaseStorage();
as用于定义模式
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
也可以使用通配符加载一个目录下的所有文件,该目录下的所有子目录的文件也会被加载。通配符由hadoop文件系统决定,下面是hadoop 0.20所支持的通配符
globcomment
Matches any single character.
Matches zero or more characters.
Matches a single character from character set (a,b,c).
Matches a single character from the character range (a..z), inclusive. The first character must be lexicographically less than or equal to the second character.
Matches a single character that is not in the character set (a, b, c). The ^ character must occur immediately to the right of the opening bracket.
Matches a single character that is not from the character range (a..z) inclusive. The ^ character must occur immediately to the right of the opening bracket.
Removes (escapes) any special meaning of character c.
Matches a string from the string set {ab, cd}
。as 定义模式,可用于load ** [as (ColumnName[:type])],foreach&generate ColumnName [as newColumnName]
。store存储数据,默认用using&PigStorage 使用tab作为分割符。
store processed into '/data/examples/processed';
也可以输入完整路径比如hdfs:///data/examples/processed.
可以使用using调用其它存储函数或其它分割符
store processed into 'processed' using
HBaseStorage();
store processed into 'processed' using PigStorage(',');
注意:数据存储并不是存储为一个文件,而是由reduce进程数决定的多个part文件。
。foreach&generate[*][begin .. end]
*匹配所有,同样适用与udf;
..匹配begin和end之间的部分,包括begin和end
prices = load 'NYSE_daily' as (exchange, symbol, date, open,
high, low, close, volume, adj_close);
beginning = foreach prices generate .. -- produces exchange, symbol, date, open
= foreach prices generate open.. -- produces open, high, low, close
= foreach prices generate volume..; -- produces volume, adj_close
一般情况下foreach&generate&重新生成的模式中的数据名和数据类型保持原来的名字和数据类型,但是如果有表达式则不会,可以在generate 变量后使用as关键词定义别名;
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
sym: {symbol: chararray}
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
in_cents = foreach divs generate dividends * 100.0 as dividend, dividends * 100.0;
describe in_
in_cents: {dividend: double,double}
&#用于map查找;.用于tuple(元组)投影;
bball = load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
avg = foreach bball generate bat#'batting_average';
A = load 'input' as (t:tuple(x:int, y:int));
B = foreach A generate t.x, t.$1;
3.获取bag(包)中的数据
A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.x;
A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.(x, y);
下面的语句将执行不了
A = load 'foo' as (x:chararray, y:int, z:int);
B = group A -- produces bag A containing all the records for a given value of x
C = foreach B generate SUM(A.y + A.z);
因为A.y 和 A.z都是bag,符号+对于bag不适用。
正确的做法如下
A = load 'foo' as (x:chararray, y:int, z:int);
A1 = foreach A generate x, y + z as
B = group A1
C = foreach B generate SUM(A1.yz);
。foreach中嵌套其它语句
--distinct_symbols.pig
= load 'NYSE_daily' as (exchange, symbol); -- not interested in other fields
= foreach grpd {
uniq_sym =
generate group, COUNT(uniq_sym);
注意:foreach内部只支持distinct,&filter,&limit, order关键词;最后一句必须是generate;
--double_distinct.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray);
uniq = foreach grpd {
uniq_exchanges =
uniq_symbols
generate COUNT(uniq_exchanges), COUNT(uniq_symbols);
。flatten消除包嵌套关系
--flatten.pig
players = load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
= foreach players generate name, flatten(position) as
--flatten_noempty.pig
players = load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
noempty = foreach players generate name,
((position is null or IsEmpty(position)) ? {('unknown')} : position)
= foreach noempty generate name, flatten(position) as
。filter&(注:pig中的逻辑语句同样遵循短路原则)
&注意:null == 任何数据
。filter结合matches使用正则表达式(matches前加not表示不匹配)
pig中的正则表达式格式和java中的正则表达所一样,参考&
各种转义字符,转义字符使用方式:\\后面跟上转义码
点的转义:. ==& u002E
美元符号的转义:$ ==& u0024
乘方符号的转义:^ ==& u005E
左大括号的转义:{ ==& u007B
左方括号的转义:[ ==& u005B
左圆括号的转义:( ==& u0028
竖线的转义:| ==& u007C
右圆括号的转义:) ==& u0029
星号的转义:* ==& u002A
加号的转义:+ ==& u002B
问号的转义:? ==& u003F
反斜杠的转义: ==& u005C&
下面的例子查找包括CM.的记录
-- filter_not_matches.pig
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
notstartswithcm = filter divs by not symbol matches '.*CM\\2u002E1.*';
。group之后的数据是一个map,其中key是group所用的键值,value是group针对的变量;
可用()同时对多个变量作group,group&all用于所有变量(注意:使用all时没有by),group之后的变量分为两个部分,第一部分变量名是group(不能更改),第二部是和原始bag模式一样的bag。
--twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
= group daily by (exchange, stock);
= foreach grpd generate group, AVG(daily.dividends);
grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray,
stock: bytearray,date: bytearray,dividends: bytearray}}
--countall.pig
daily = load 'NYSE_daily' as (exchange, stock);
= foreach grpd generate COUNT(daily);
。cogroup对多个变量进行group
注意:所有key值为null的数据都被归为同一类,这一点和group相同,和join不同。
A = load 'input1' as (id:int, val:float);
B = load 'input2' as (id:int, val2:int);
C = cogroup A by id, B
describe C;
C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}}
&。order by
对单列进行排序
--order.pig
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
对多列进行排序
--order2key.pig
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
bydatensymbol
= order daily by date,
desc关键词按降序进行排序,null小于所有词
--orderdesc.pig
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
= order daily by close desc,
-- open still sorted in ascending order
。distinct只能去掉整个元组的重复行,不能去掉某几个特定列的重复行
--distinct.pig
-- find a distinct list of ticker symbols for each exchange
-- This load will truncate the records, picking up just the first two fields.
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray);
。join/left join / right join
null不匹配任何数据
-- join2key.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
= load 'NYSE_dividends' as (exchange, symbol, date, dividends);
= join daily by (symbol, date), divs by (symbol, date);
--leftjoin.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
= load 'NYSE_dividends' as (exchange, symbol, date, dividends);
= join daily by (symbol, date) left outer, divs by (symbol, date);
也可以同时多个变量,但只用于inner join
A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C
也可以自身和自身join,但数据要加载两次
--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends);
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends);
= join divs1 by symbol, divs2
increased = filter jnd by divs1::date & divs2::date and
divs1::dividends & divs2::
下面这样不行
--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends);
= join divs1 by symbol, divs1
increased = filter jnd by divs1::date & divs2::date and
divs1::dividends & divs2::
。union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量没有模式。
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;
C: {x: int,y: float}
A = load 'input1' as (x:double, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;
C: {x: double,y: double}
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;
Schema for C unknown.注意:在pig 1.0中 执行不了最后一种union。
注意:union不会剔除重复的行
如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字
A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;
C: {w: chararray,x: int,y: double,z: chararray}
。cross 相当于离散数学中的叉乘,输入行数分别为m行,n行,输出行数则为m*n行。
--thetajoin.pig
--I recommand running this one on a cluster too
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
crossed = cross daily,
= filter crossed by daily::date & divs::
--limit.pig
= load 'NYSE_dividends';
first10 = limit divs 10;
在pig中除了order by 之外生成的数据都没有固定的顺序。上面的程序每次生成的数据也是不一样的。
。sample 用于生成测试数据,按指定参数选取部分数据。下面的程序选取10%的数据。
--sample.pig
divs = load 'NYSE_dividends';
some = sample divs 0.1;
。Parallel &设置pig的reduce进程个数
--parallel.pig
= load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
bysymbl = group daily by symbol parallel 10;
parallel只针对一条语句,如果希望脚本中的所有语句都有10个reduce进程,可以使用 set default_parallel 10命令
--defaultparallel.pig
set default_parallel 10;
= load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
volume, adj_close);
bysymbl = g
average = foreach bysymbl generate group, AVG(daily.close) as
如果同时使用parallel和set default_parallel,那么parallel中的参数将覆盖set default_parallel
--register.pig
register 'your_path_to_piggybank/piggybank.jar';
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate
org.apache.pig.piggybank.evaluation.string.Reverse(symbol);
定义udf别名
--define.pig
register 'your_path_to_piggybank/piggybank.jar';
define reverse org.apache.pig.piggybank.evaluation.string.Reverse();
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate reverse(symbol);
构造函数带参数的udf
--define_constructor_args.pig
register 'acme.jar';
define convert com.acme.financial.CurrencyConverter('dollar', 'euro');
= load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate convert(dividends);
。托管java中的静态函数(效率较低)
--invoker.pig
define hex InvokeForString('java.lang.Integer.toHexString', 'int');
= load 'NYSE_daily' as (exchange, symbol, date, open, high, low,
close, volume, adj_close);
nonnull = filter divs by volume is not null;
inhex = foreach nonnull generate symbol, hex((int)volume);
如果函数的参数是一个数组,那么传递过去的是一个bag
define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]');
A = load 'input' as (id: int, dp:double);
B = group A
C = foreach B generate group, stdev(A.dp);
&。multiquery&
--multiquery.pig
= load 'baseball' as (name:chararray, team:chararray,
position:bag{t:(p:chararray)}, bat:map[]);
= foreach players generate name, team, position,
bat#'batting_average' as
= group pwithba by
= foreach byteam generate group, AVG(pwithba.batavg);
store avgbyteam into 'by_team';
flattenpos = foreach pwithba generate name, team,
flatten(position) as position,
= group flattenpos by
= foreach bypos generate group, AVG(flattenpos.batavg);
store avgbypos into 'by_position';
wlogs = load 'weblogs' as (pageid, url, timestamp);
split wlogs into apr03 if timestamp & '',
apr02 if timestamp & '' and timestamp & '',
apr01 if timestamp & '' and timestamp & '';
store apr03 into '';
store apr02 into '';
store apr01 into '';
。设置pig环境
ParameterValue TypeDescription
Sets the logging level to&DEBUG. Equivalent to passing&-debug DEBUG&on the command line.
default_parallel
Sets a default parallel level for all reduce operations in the script. See&&for details.
Assigns a name to the Hadoop job. By default the name is the filename of the script being run, or a randomly generated name for interactive sessions.
job.priority
string Type
If your Hadoop cluster is using the Capacity Scheduler with priorities enabled for queues, this allows you to set the priority of your Pig job. Allowed values are&very_low,&low,&normal,&high,&very_high.
。parameter 向pig脚本传递参数
--daily.pig
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
yesterday = filter daily by date == '$DATE';
= group yesterday all;
= foreach grpd generate MAX(yesterday.high), MIN(yesterday.low);
用-p 传递参数,每个变量前都要加一个-p
pig -p DATE=2009-12-17 daily.pig
参数也可以放在一个文件里,每行一个参数,注释部分以#开头,使用-m&或者&-param_file.调用参数文件
wlogs = load 'clicks/$YEAR$MONTH01' as (url, pageid, timestamp);
#Param file
YEAR=2009-
DATE=$YEAR$MONTH$DAY
pig -param_file daily.params daily.pig
&也可以在pig内定义参数%declare 或者 %default,%default定义默认的参数,在特殊情况下可以被覆盖
注意:%declare和%default不能用于以下位置:
pig脚本,此脚本非Macro宏,并且脚本被另外一个脚本调用(如果不被调用可以使用)
%default parallel_factor 10;
wlogs = load 'clicks' as (url, pageid, timestamp);
= group wlogs by pageid parallel $parallel_
= foreach grp generate group, COUNT(wlogs);
。定义Macro宏,相当于子函数
--macro.pig
-- Given daily input and a particular year, analyze how
-- stock prices changed on days dividends were paid out.
define dividend_analysis (daily, year, daily_symbol, daily_open, daily_close)
returns analyzed {
= load 'NYSE_dividends' as (exchange:chararray,
symbol:chararray, date:chararray, dividends:float);
divsthisyear
= filter divs by date matches '$year-.*';
dailythisyear = filter $daily by date matches '$year-.*';
= join divsthisyear by symbol, dailythisyear by $daily_
= foreach jnd generate dailythisyear::$daily_symbol,
$daily_close - $daily_
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');
。引用pig文件,被引用的文件被执行一遍,相当于拼接在一起,被引用的文件中不能存在自定义变量
--main.pig
import '../examples/ch6/dividend_analysis.pig';
= load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');
默认搜索文件夹为当前文件夹,可以使用set pig.import.search.path设置搜索的路径
set pig.import.search.path '/usr/local/pig,/grid/pig';
import 'acme/macros.pig';
阅读(...) 评论()

我要回帖

更多关于 视频录制倒了怎么办 的文章

 

随机推荐