`
serisboy
  • 浏览: 169653 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

hadoop--mapreduce代码之多表关联

 
阅读更多
package com.hadoop.sample;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MTJoin {
	private static int time = 0;
	public static class Map extends Mapper<Object,Text,Text,Text>{
		//在map中先区分输入行属于左表还是右表,然后对两列值进行分割,
		//保存连接列在key值,剩余列和左右表标志在value中,最后输出
		public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
			String line = value.toString();
			int i = 0;
			//输入文件首行,不处理
			if(line.contains("factoryname")==true||line.contains("addressID")==true){
				return;
			}
			//找出数据中的分割点
			while(line.charAt(i)>='9'||line.charAt(i)<='0'){
				i++;
			}
			if(line.charAt(i)>='9'||line.charAt(i)<='0'){
				//左表
				int j = i-1;
				while(line.charAt(j)!=' ') j--;
				String[] values = {line.substring(0, j),line.substring(i)};
				context.write(new Text(values[1]), new Text("1+"+values[0]));
			}else{//右表
				int j = i+1;
				while(line.charAt(j)!=' ') j++;
				String[] values = {line.substring(0, i+1),line.substring(j)};
				context.write(new Text(values[0]), new Text("2+"+values[1]));
			}
		}
	}
	public static class Reduce extends Reducer<Text,Text,Text,Text>{
		//reduce解析map输出,将value中数据按照左右表分别保存,然后求笛卡尔积,输出
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
			if(time == 0){//输入文件第一行
				context.write(new Text("factoryname"),new Text("addressname"));
				time++;
			}
			int factorynum = 0;
			String factory[] = new String[10];
			int adressnum = 0;
			String adress[] = new String[10];
			Iterator iter = values.iterator();
			while(iter.hasNext()){
				String record = iter.next().toString();
				int len = record.length();
				int i = 2;
				char type = record.charAt(0);
				String factoryname = new String();
				String adressname = new String();
				if(type == '1'){//左表
					factory[factorynum] = record.substring(2);
					factorynum++;
				}else{//右表
					adress[adressnum] = record.substring(2);
				}
			}
			if(factorynum!=0&&adressnum!=0){//笛卡尔积
				for(int m=0;m<factorynum;m++){
					for(int n=0;n<adressnum;n++){
						context.write(new Text(factory[m]), new Text(adress[n]));
					}
				}
			}
		}
	}
	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 2){
			System.err.println("Usage WordCount <int> <out>");
			System.exit(2);
		}
		Job job = new Job(conf,"word count");
		job.setJarByClass(MTJoin.class);
		job.setMapperClass(Map.class);
		job.setCombinerClass(Reduce.class);
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}
分享到:
评论
1 楼 kingding 2016-04-11  
运行出错,
java.lang.Exception: java.lang.StringIndexOutOfBoundsException: String index out of range: 26
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 26
at java.lang.String.charAt(String.java:658)
at com.hqmart.hadoop.MTjoin$Map.map(MTjoin.java:32)
at com.hqmart.hadoop.MTjoin$Map.map(MTjoin.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/04/11 14:40:38 INFO mapreduce.Job:  map 100% reduce 0%
16/04/11 14:40:38 INFO mapreduce.Job: Job job_local85085256_0001 failed with state FAILED due to: NA
16/04/11 14:40:38 INFO mapreduce.Job: Counters: 26

相关推荐

Global site tag (gtag.js) - Google Analytics