欢迎投稿

今日深度:

hive2solr问题小结,hive2solr小结

hive2solr问题小结,hive2solr小结


  搞了一段时间,hive2solr的job终于可以稳定的跑了,实现使用hive向solr插数据,主要是实现RecordWriter接口,重写write方法和close方法。下面对遇到的问题一一列出:

1.数据覆盖问题,使用原子更新
参考:http://caiguangguang.blog.51cto.com/1652935/1599137
2.重复构建solrserver和solrtable对象问题,使用static在初始化的时候构建,后面直接调用
构建:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20         public static Map<Integer,SolrServer> solrServers = new HashMap<Integer,SolrServer>();         public static Map<Integer,SolrTable> solrTables = new HashMap<Integer,SolrTable>();         public static String[] iparray;         public static String ipstring;         public static String collec;         static {                LOG .warn("in SolrServerCustom start initialize ip maps" );                ipstring = "xxxx,xxxxxx";                collec = "userinfo" ;                LOG .warn("in SolrServerCustom  ipstring and collec: " + ipstring + "," + collec );                iparray = ipstring .split("," );               Arrays. sort( iparray);                for (int i=0;i< iparray. length;i++){                      String urlx = "http://" +iparray [i]+"/solr/" + collec;                       solrServers.put(i, new HttpSolrServer(urlx));                       solrTables.put(i, new SolrTable(String.valueOf(i)));               }                LOG .warn("in SolrServerCustom end initialize ip maps,maps size " + solrServers .size());                LOG .warn("in SolrServerCustom end initialize ip mapsx,mapsx size " + solrTables .size());         }

引用:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31  public void write(Writable w) throws IOException {           MapWritable map = (MapWritable) w;           SolrInputDocument doc = new SolrInputDocument();           String key;           String value;           String newkey;           int idx;           for (final Map.Entry<Writable, Writable> entry : map.entrySet()) {                key = entry.getKey().toString();                newkey = this.tableName + "." + entry.getKey().toString();                value = entry.getValue().toString();                if(key.equals("id")){                     idx = SolrUtil.getIntServer(value,SolrServerCustom.solrServers); //引用静态属性SolrServerCustom.solrServers                     table = SolrServerCustom.solrTables.get(idx); //引用静态属性SolrServerCustom.solrTables                     table.setNumInputBufferRows(this.numInputBufferRows);                }                if(key.equals("id")){                     doc.addField("id",Integer.valueOf(value));                }else{                     if (value.equals("(null)")){                          value = "";                     }                     setOper = new LinkedHashMap<String,Object>();                     setOper.put("set",value);                     if(!doc.keySet().contains(newkey)){                          doc.addField(newkey, setOper);                     }                    }           }           table.save(doc);      }

3.代码存在内存泄露问题
1)对象的声明,放在循环外,并调整outbuffer的大小
现象:yarn map/reduce  java heap满导致job hang

错误日志:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 2015-01-26 14:01:10,000 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded         at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45)         at java.lang.StringBuilder.<init>(StringBuilder.java:68)         at com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71)         at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:621)         at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793)         at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:87)         at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793)         at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:92)         at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793)         at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:540)         at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:177)         at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)         at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428)         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160)         at java.security.AccessController.doPrivileged(Native Method)         at javax.security.auth.Subject.doAs(Subject.java:396)         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)               at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155)

2)try...catch....finally的使用(在finally中 clear buffer)
一开始没有增加finally,导致在异常发生时buffer会大于设置,最终导致job内存用满,hang住。

4.异常的处理
要求一个solrserver出错,或者solr暂时不响应时程序不能退出,默认情况下异常向上抛出,最终导致job失败
比如:

1 2 3 4 5 6 7 Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected content type application/octet-stream but got text/html. <html> <head><title>504 Gateway Time-out</title></head> <body bgcolor="white"> <center><h1>504 Gateway Time-out</h1></center> <hr><center>nginx/1.6.2</center> </body> </html>

 防止异常的抛出会造成runtime error导致job失败,catch异常后不做处理

1 2 3 4 5 6 7 8 9 10 11      public void flush(){           try {                if (!outputBuffer.isEmpty()) {                     server.add(outputBuffer);                }           } catch(Exception e){                LOG.warn("solrtable add error,Exception log is " + e);           }finally{                outputBuffer.clear(); //在finally中清除buffer,否则会导致buffer在异常抛出时一直递增导致jvm oom的问题           }      }

5.commit问题,调用close方法时,只有最后一个solrtable会close,开始时使用每插入一行就commit的方式,但是这种性能很差(大约50%的降低),后来在solrserver端控制commit
solrconfig.xml:

1 2 3 4 5 6 7 8 9 10      <autoCommit>        <!--<maxTime>${solr.autoCommit.maxTime:15000}</maxTime>-->          <maxDocs>15000</maxDocs//当内存索引数量达到指定值的时候,将内存的索引DUMP到硬盘中,并通知searcher类加载新的索引         <maxTime>1000</maxTime//每隔指定的时间段,自动的COMMIT内存中的索引数据,并通知Searcher类加载新的索引,以最先达到条件执行为准        <openSearcher>true</openSearcher>  //设置为false时,虽然commit会导致index的变更flush到磁盘上,但是客户端不会看到更新      </autoCommit>           <autoSoftCommit>        <maxTime>${solr.autoSoftCommit.maxTime:10000}</maxTime>      </autoSoftCommit>

这里autoCommit是指hard commit,如果不使用autoCommit也可以在add document时带上commitWithin的参数autoSoftCommit和autoCommit类似,但是它是一个solf类型的commit,可以确保数据可见但是没有把数据flush到磁盘,机器crash会导致数据丢失。
save也导致性能损耗,save会消耗6ms左右的时间,需要放到一个list中进行save操作(batch操作)


6.outbuffer的问题
初始的代码,因为对用solrtable来说只有一个入口(solrcloud时也一样),这样solrtable只有一个实例,这里用到了静态变量,每个solrtable不能按自己的buffer进行操作
改成非静态变量,并且使用静态代码块初始化table和server,放到一个hashmap中,用的时候去取,保证只有几个的实例。否则如果在使用时进行实例化,每次的对象都不同,导致buffer一直为1。

7.close的问题
如果设置了buffer,可能会导致不能flush

1 2 3 4 5 6 public void save(SolrInputDocument doc) {      outputBuffer.add(doc); //使用save放到buffer list中      if (outputBuffer.size() >= numOutputBufferRows) { //只有list的大小>=设置的buffer大小时才会触发flush的操作          flush();      } }

而flush中会调用server.add(outputBuffer)操作。filesink关闭时调用SolrWriter.close
调用SolrTable的commit(commit中调用flush和server.commit),发现只有最后一个table实例会调用commit.
解决方法,在SolrWriter.close中循环调用SolrTable.commit方法:

1 2 3 4 5 6 7 8 9 10 public void close(boolean abort) throws IOException {      if (!abort) {          Map<Integer,SolrTable> maps = new SolrServerCustom().solrTable;          for(Map.Entry<Integer, SolrTable> entry:maps.entrySet()){              entry.getValue().commit();          }      else {          table.rollback();      } }

8.锁的问题,从nginx端看到大量的302 ,solr日志看到有锁的问题,调整参数,在solr启动时释放锁
solr端日志:

1 userinfo: org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: Index locked for write for core userinfo

解决:solrconfig.xml中设置

1 <unlockOnStartup>true</unlockOnStartup>

原因:

org.apache.solr.core.SolrCore初始化时使用IndexWriter.isLocked(dir)判断是否加锁,如果已经加了锁,则分为两种情况,一种是在solrconfig.xml中配置了unlockOnStartup,会尝试unlock,如果没有配置unlockStartup,则会抛出Index locked for write for core异常

根据堆栈可以看对应代码:
org.apache.solr.core.SolrCore 构造函数中会调用initIndex方法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43   void initIndex(boolean reload) throws IOException {       String indexDir = getNewIndexDir();       boolean indexExists = getDirectoryFactory().exists(indexDir);       boolean firstTime;       synchronized (SolrCore.class) {         firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));       }       boolean removeLocks = solrConfig.unlockOnStartup; // unlockOnStartup = getBool(indexConfigPrefix+"/unlockOnStartup", false); 默认为false       initIndexReaderFactory();       if (indexExists && firstTime && !reload) {                  Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,             getSolrConfig().indexConfig.lockType);         try {           if (IndexWriter.isLocked(dir)) {             if (removeLocks) {               log.warn(                   logid                       "WARNING: Solr index directory '{}' is locked.  Unlocking...",                   indexDir);               IndexWriter.unlock(dir); //解锁             else {               log.error(logid                   "Solr index directory '{}' is locked.  Throwing exception",                   indexDir);               throw new LockObtainFailedException(                   "Index locked for write for core " + name);             }                        }         finally {           directoryFactory.release(dir);         }       }       // Create the index if it doesn't exist.       if(!indexExists) {         log.warn(logid+"Solr index directory '" new File(indexDir) + "' doesn't exist."                 " Creating new index...");         SolrIndexWriter writer = SolrIndexWriter.create("SolrCore.initIndex", indexDir, getDirectoryFactory(), true,                                                         getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);         writer.close();       }   }

9.tomcat的配置导致的问题,每台机器两个solr实例,其中一个一直不能启动(在实例化core时会尝试获取锁,这里获取锁失败,可以手动删除write.lock)
最终发现是两个tomcat写到了一个solr目录里面

错误日志:

1 2 3 4 5 6 7 8 9 Caused by: org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: NativeFSLock@/apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write.lock      at org.apache.lucene.store.Lock.obtain(Lock.java:89)      at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:710)      at org.apache.solr.update.SolrIndexWriter.<init>(SolrIndexWriter.java:77)      at org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64)      at org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267)      at org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110)      at org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513)      ... 12 more

10.部分job运行缓慢,其中一个job运行了11个小时。。
原因:
数据写入时发生在mapoperator或者reduceoperator中,多少个map或者reduce就是多少个并发线程写入。job只有一个reduce,导致写入缓慢,调整reduce的数量到100(set mapreduce.job.reduces=100)后,性能大幅度提升,3kw数据导入时间由40916s下降到993s。



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1612601,如需转载请自行联系原作者

www.htsjk.Com true http://www.htsjk.com/solr/11100.html NewsArticle hive2solr问题小结,hive2solr小结   搞了一段时间,hive2solr的job终于可以稳定的跑了,实现使用hive向solr插数据,主要是实现RecordWriter接口,重写write方法和close方法。下面对遇到的问题一一...
评论暂时关闭