1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
| #!/usr/bin/python
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import os
import sys
import logging
import argparse
#parse args
parser = argparse.ArgumentParser(description='Merge avro avro files into single avro file.')
parser.add_argument("-i", "--inputdir", action="store", dest="input_dir", help="Input file directory", required=True)
parser.add_argument("-o", "--outputfile", action="store", dest="output_file", help="Output file name", required=True)
parser.add_argument("-s", "--schemafile", action="store", dest="schema_file", help="Schema file name, default is ./avro_schema.avsc", default="avro_schema.avsc")
parser.add_argument("-q", "--quiet", action="store_false", dest="verbose", help="don't print log messages to stdout", default=True)
args = parser.parse_args()
#Add Logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
if args.verbose == False:
ch.setLevel(logging.ERROR)
else:
ch.setLevel(logging.DEBUG)
#init variables
input_dir = args.input_dir
output_file = args.output_file
schema_file = args.schema_file
avrometa = ""
avrorecords = []
avrostats = ""
logging.info('Start writing merged data into: %s',output_file)
schema = avro.schema.parse(open(schema_file).read())
writer = DataFileWriter(open(output_file, "w"), DatumWriter(), schema, 'deflate')
for target_file in os.listdir(input_dir):
if target_file.endswith(".avro"):
avrorecords = []
logging.info('merging: %s',target_file)
try:
target_rows = DataFileReader(open(input_dir+"/"+target_file, "r"), DatumReader())
except Exception as e:
raise avro.schema.AvroException(e)
#need to capture very first file's first line for avrometa, otherwise skip first line to remove avrometa
if avrometa != "" and avrostats != "":
next(target_rows)
for row in target_rows:
avrorecords.append(row)
#capture avrometa(header) and bogus avrostats(footer)
if avrometa == "":
avrometa = avrorecords[0]
writer.append(avrometa)
logging.info('avrometa: %s',avrometa)
del avrorecords[0]
if avrostats == "":
avrostats = avrorecords[-1]
logging.info('avrostats: %s',avrostats)
#remove avrostats then append records
del avrorecords[-1]
for avrorecord in avrorecords:
writer.append(avrorecord)
target_rows.close()
writer.append(avrostats)
writer.close()
logging.info('Writing finished. All done.')
|