I am wrting file with Haddop API, i checked outputSessionData, its look fine
outputSessionData.saveAsNewAPIHadoopFile(outputDataDir, String.class, Map.class, PeopleSessionFormatter.class, ContextHandler.getInstance().getHadoopConf());
we’re using, HadoopFileSystemHandler
FileSystem fs = HadoopFileSystemHandler.getInstance().getHadoopLocalFileSystem();
but in the end result i am getting that crc thing
public class PeopleSessionFormatter extends FileOutputFormat<String, Map<String, ArrayList<PeopleSession>>> {
private static Logger logger = Logger.getLogger(PeopleSessionFormatter.class.getName());
@Override
public RecordWriter<String, Map<String, ArrayList<PeopleSession>>> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
try {
// get the current path
Path path = FileOutputFormat.getOutputPath(context);
// create the full path with the output directory plus our filename
Path fullPath = new Path(path, context.getTaskAttemptID().getTaskID().toString());
// create the file in the file system
FileSystem fs = HadoopFileSystemHandler.getInstance().getHadoopLocalFileSystem();
FSDataOutputStream fileOut = fs.create(fullPath);
return new PeopleSessionWriter(fileOut, context.getConfiguration().get(Constant.DELIMITER1));
} catch (Exception e) {
logger.error("Failed to set record writer for output part files. The error message is: " + e.getMessage());
logger.error(e.getStackTrace());
throw new IOException(e.getMessage());
}
}
@Override
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Validation Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outDir }, job.getConfiguration());
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
logger.warn("Overwriting existing session files.");
}
}
}
public class ContextHandler {
// Fields.
SparkConf sparkConf;
JavaSparkContext sc;
Configuration hadoopConf;
// Singleton.
private static ContextHandler instance;
public static ContextHandler getInstance() throws Exception {
if (instance == null) {
instance = new ContextHandler();
}
return instance;
}
private ContextHandler() throws Exception {
ValidationConfigHandler cfgHandler = OcConfigHandler.getInstance().getOcConfigurations();
// Create configuration.
sparkConf = new SparkConf();
sparkConf.setAppName(Constant.APPLICATION_NAME);
sparkConf.set(Constant.SPARK_EXECUTOR_EXTRA_JAVA_OPTIONS_KEY, cfgHandler.getSparkExecutorExtraJavaOptions());
if (cfgHandler.getEnableKryoSerialization()) {
sparkConf.set(com.kantarmedia.atriang.share.util.Constant.SPARK_SERIALIZER,
com.kantarmedia.atriang.share.util.Constant.KRYO_SERIALIZER);
sparkConf.set(com.kantarmedia.atriang.share.util.Constant.KRYO_REGISTRATION_REQUIRED,
com.kantarmedia.atriang.share.util.Constant.TRUE);
String krySerializationClasses = com.kantarmedia.atriang.share.util.Constant.KRYO_SERIALIZATION_CLASSES;
String[] classNameArray = krySerializationClasses
.split(com.kantarmedia.atriang.share.util.Constant.COMMA_SEPERATOR);
int i = 0;
Class[] classArray = new Class[classNameArray.length];
for (String className : classNameArray) {
classArray[i++] = Class.forName(className);
}
sparkConf.registerKryoClasses(classArray);
}
hadoopConf = new Configuration();
hadoopConf.set(Constant.DELIMITER1, cfgHandler.getDelimiter1());
}
public JavaSparkContext getSparkContext() {
if (sc == null)
sc = new JavaSparkContext(sparkConf);
return sc;
}
public void closeSparkContext() {
if (sc != null)
sc.close();
}
public SparkConf getConf() {
return sparkConf;
}
public void tearDown() {
closeSparkContext();
}
public Configuration getHadoopConf() {
return hadoopConf;
}
}
Writing file in format of .psv
DELIMITER1 is ‘|’ symbol
I dont want crc^@^@^@^B^@ in .psv output file
pranav parab is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.