0%

第三次作业

2018年12月24日 下午3:54

计算单词共现矩阵

开发环境截图:

执行结果:


代码

occurrence_matrix.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package mp.occurrence_matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class OccurrenceMatrix {


public static class Co_OccurrenceMatrixMapperWithPair extends Mapper<LongWritable, Text, TextPair, DoubleWritable> {

@Override
public void map(LongWritable inputKey, Text inputValue, Context context)
throws IOException, InterruptedException {

String doc = inputValue.toString();
String reg = "[\\p{P}\\s]";
String[] allTerms = doc.split(reg);
for(int i = 0; i < allTerms.length; i++) {
if((!"".equals(allTerms[i])) && allTerms[i] != null) {
Map<String, Integer> pairMap = new HashMap<String, Integer>();

String[] termNeighbors = neighborsOfTerm(allTerms[i], i, allTerms, 3);
for(String nbTerm : termNeighbors) {
if((!"".equals(nbTerm)) && nbTerm != null) {
String textPairStr = allTerms[i] + "," + nbTerm;
if(!pairMap.containsKey(textPairStr)) {
pairMap.put(textPairStr, 1);
} else {
pairMap.put(textPairStr, pairMap.get(textPairStr) + 1);
}

}
}
for(Entry<String, Integer> entry: pairMap.entrySet()) {
String[] pairStrs = entry.getKey().split(",");
TextPair textPair = new TextPair(pairStrs[0], pairStrs[1]);
context.write(textPair, new DoubleWritable(entry.getValue()));
}
}

}

}

public String[] neighborsOfTerm(String term, int pos, String[] allterms, int windowSize) {
String[] neighbors = new String[windowSize];
int count = allterms.length;
int j = 0;
int leftOffSet = 0;
int rightOffSet = 0;
if(pos < windowSize / 2) {
leftOffSet = pos;
rightOffSet = windowSize - leftOffSet;
} else if (pos >= count - 1 - windowSize / 2) {
rightOffSet = count - 1 - pos;
leftOffSet = windowSize - rightOffSet;
} else {
leftOffSet = windowSize / 2;
rightOffSet = windowSize - leftOffSet;
}
for(int i = pos - leftOffSet; i <= pos + rightOffSet && i >=0 && i < count; i++) {
if(term != allterms[i] ) {
neighbors[j] = allterms[i];
j ++;
}
}

return neighbors;
}
}

public static class Co_OccurrenceMatrixReducerWithPair extends Reducer<TextPair, DoubleWritable, TextPair, DoubleWritable> {
@Override
public void reduce(TextPair inputKey, Iterable<DoubleWritable> inputValues, Context context)
throws IOException, InterruptedException {
int sum = 0;
for(DoubleWritable inC : inputValues) {
sum += inC.get();
}
context.write(inputKey, new DoubleWritable(sum));
}
}


public static class Co_OccurrenceMatrixMapperWithStripe extends Mapper<LongWritable, Text, Text, TextStripe> {

@Override
public void map(LongWritable inputKey, Text inputValue, Context context)
throws IOException, InterruptedException {
String doc = inputValue.toString();
String reg = "[\\p{P}\\s]";
String[] allTerms = doc.split(reg);
for (int i = 0; i < allTerms.length; i++) {
if((!"".equals(allTerms[i])) && allTerms[i] != null) {
Text outputKey = new Text(allTerms[i]);
TextStripe termTS = new TextStripe();
String[] termNeighbors = neighborsOfTerm(allTerms[i], i, allTerms, 3);
for(String nbTerm : termNeighbors) {
if((!"".equals(nbTerm)) && nbTerm != null) {
Text co_term = new Text(nbTerm);
if(!termTS.containsKey(co_term)) {
termTS.put(co_term, new DoubleWritable(1));
} else {
DoubleWritable lastValue = (DoubleWritable) termTS.get(co_term);
double newValue = lastValue.get() + 1.0;
termTS.put(co_term, new DoubleWritable(newValue));
}
}
}
context.write(outputKey, termTS);
}
}
}

public String[] neighborsOfTerm(String term, int pos, String[] allterms, int windowSize) {
String[] neighbors = new String[windowSize];
int count = allterms.length;
int j = 0;
int leftOffSet = 0;
int rightOffSet = 0;
if(pos < windowSize / 2) {
leftOffSet = pos;
rightOffSet = windowSize - leftOffSet;
} else if (pos >= count - 1 - windowSize / 2) {
rightOffSet = count - 1 - pos;
leftOffSet = windowSize - rightOffSet;
} else {
leftOffSet = windowSize / 2;
rightOffSet = windowSize - leftOffSet;
}
for(int i = pos - leftOffSet; i <= pos + rightOffSet && i >=0 && i < count; i++) {
if(term != allterms[i] ) {
neighbors[j] = allterms[i];
j ++;
}
}

return neighbors;
}
}


public static class Co_OccurrenceMatrixReducerWithStripe extends Reducer<Text, TextStripe, Text, TextStripe> {

@Override
public void reduce(Text inputKey, Iterable<TextStripe> inputValues, Context context)
throws IOException, InterruptedException {
TextStripe sumStripe = new TextStripe();
for(TextStripe ts : inputValues) {
sumStripe.putAll(ts);
}
context.write(inputKey, sumStripe);
}
}




public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job co_occurrenceMatrixJob = new Job(conf, "co_correnceMatrixJob");

co_occurrenceMatrixJob.setJarByClass(Co_OccurrenceMatrix.class);

FileInputFormat.setInputPaths(co_occurrenceMatrixJob,new Path( args[0]));
FileOutputFormat.setOutputPath(co_occurrenceMatrixJob, new Path(args[1]));

co_occurrenceMatrixJob.setInputFormatClass(TextInputFormat.class);
co_occurrenceMatrixJob.setOutputFormatClass(TextOutputFormat.class);

co_occurrenceMatrixJob.setOutputKeyClass(TextPair.class);
co_occurrenceMatrixJob.setOutputValueClass(DoubleWritable.class);

co_occurrenceMatrixJob.setMapperClass(Co_OccurrenceMatrixMapperWithPair.class);
co_occurrenceMatrixJob.setCombinerClass(Co_OccurrenceMatrixReducerWithPair.class);
co_occurrenceMatrixJob.setReducerClass(Co_OccurrenceMatrixReducerWithPair.class);


if(co_occurrenceMatrixJob.waitForCompletion(true)) {
System.out.println("作业运行完毕!");
}


}

}

TextPair.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package mp.co_occurrence_matrix;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

public class TextPair implements WritableComparable<TextPair>{

private Text first;
private Text second;

public TextPair() {
set (new Text(), new Text());
}

public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}

public TextPair(Text first, Text second) {
set(first, second);
}

public void set(Text first, Text second) {
// TODO Auto-generated method stub
this.first = first;
this.second = second;
}

public Text getFirst() {
return first;
}

public Text getSecond() {
return second;
}

@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
first.readFields(arg0);
second.readFields(arg0);
}

@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
first.write(arg0);
second.write(arg0);
}

@Override
public int compareTo(TextPair tp) {

int cmp = this.getFirst().compareTo(tp.getFirst());
if(cmp != 0) {
return cmp;
}

if(this.getSecond().toString().equals("*")) {
return -1;
} else if (tp.getSecond().toString().equals("*")) {
return 1;
} else {
return this.getSecond().compareTo(tp.getSecond());
}
}

@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}

@Override
public boolean equals(Object o) {
if(o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString() {
return first + "," + second;
}
}

TextStripe.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package mp.co_occurrence_matrix;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class TextStripe extends MapWritable {
public TextStripe(){
super();
}

@Override
public String toString(){
String res = "";
for(Entry<Writable, Writable> entry : this.entrySet()) {
Text key = (Text) entry.getKey();
DoubleWritable value = (DoubleWritable) entry.getValue();
res += key.toString()+ ":" + value.get() + ";";
}
return res;
}

public void putAll(TextStripe ts) {
for(Entry<Writable, Writable> entry : ts.entrySet()) {
Text tsKey = (Text)entry.getKey();
DoubleWritable tsValue = (DoubleWritable)entry.getValue();
if(this.containsKey(tsKey)) {
double newValue = ((DoubleWritable)this.get(tsKey)).get() + tsValue.get();
this.put(tsKey, new DoubleWritable(newValue));
this.put(tsKey, tsValue);
}
}
}

public static void main(String args[]) {
TextStripe termTS = new TextStripe();

String[] terms = {"ab", "cd", "ef", "ab"};
for(String tnTerm : terms) {
Text co_term = new Text(tnTerm);
if(!termTS.containsKey(co_term)) {
termTS.put(co_term, new DoubleWritable(1));
} else {
DoubleWritable lastValue = (DoubleWritable) termTS.get(co_term);
double newValue = lastValue.get() + 1;
termTS.put(co_term, new DoubleWritable(newValue));
}
}

TextStripe termTS1 = new TextStripe();
String[] terms1 = {"cd", "cd", "ef", "ab", "we"};
for(String tnTerm : terms1) {
Text co_term = new Text(tnTerm);
if(!termTS1.containsKey(co_term)) {
termTS1.put(co_term, new DoubleWritable(1));
} else {
DoubleWritable lastValue = (DoubleWritable) termTS1.get(co_term);
double newValue = lastValue.get() + 1.0;
termTS1.put(co_term, new DoubleWritable(newValue));
}
}
TextStripe totalTS = new TextStripe();
totalTS.putAll(termTS1);
totalTS.putAll(termTS);
System.out.println(totalTS.toString());

double totalNumOfWords = 0.0;
for(Entry<Writable, Writable> entry: totalTS.entrySet()) {
DoubleWritable count = (DoubleWritable) entry.getValue();
totalNumOfWords += count.get();
}
for(Entry<Writable, Writable> entry: totalTS.entrySet()) {
Text word = (Text) entry.getKey();
DoubleWritable count = (DoubleWritable) entry.getValue();
double rfValue = count.get() / totalNumOfWords;
BigDecimal bd = new BigDecimal(rfValue);
rfValue = bd.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
totalTS.put(word, new DoubleWritable(rfValue));
}
System.out.println(totalTS.toString());

Map<TextPair, Double> test = new HashMap<TextPair, Double>();
TextPair t1 = new TextPair("a", "b");
TextPair t2 = new TextPair("c", "*");

test.put(t1, 1.2);
test.put(t2, 3.4);

for(Entry<TextPair, Double> entry: test.entrySet()) {
System.out.println(entry.getKey().toString());
System.out.println(entry.getKey().toString().split(",")[1]);
if(entry.getKey().toString().split(",")[1] == "*") {
System.out.println("HHHH");
}
System.out.println(entry.getValue());
}

}

}