欢迎投稿

今日深度:

Elasticsearch批量插入数据,elasticsearch插入

Elasticsearch批量插入数据,elasticsearch插入


1. 创建本地TransportClient:

    static TransportClient client = null;
	
	private static Logger logger = LoggerFactory.getLogger(ESUtils.class);
	
	public final static String HOST = "localhost";
	
	public final static int PORT = 9300;
	
	static Settings settings = Settings.builder().put("cluster.name","elasticsearch")
			.put("client.transport.sniff",true)
			.put("transport.type","netty3")
			.put("http.type", "netty3")
			.build();
	static {
		getConnection();
	}
	@SuppressWarnings({ "resource", "unchecked" })
	public static void getConnection() {
		try {
			client = new PreBuiltTransportClient(Settings.EMPTY)
					.addTransportAddress(new TransportAddress(InetAddress.getByName(HOST), PORT));
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
        logger.debug("Elasticsearch connect info:" + client.toString());
        }
	public void closeClient() {
		if(client != null) {
			client.close();
		}
	}
	public static void main(String[] args) {
		System.out.println(client);
	}

2. 创建索引:

    public void addIndexByName(String index, String type, TransportClient client) throws IOException {
		XContentBuilder mapper = XContentFactory.jsonBuilder().startObject().startObject("settings")
				.startObject("index")
				.field("number_of_shards",3).field("number_of_replicas",0)
				.endObject().endObject()
				.startObject("mappings").startObject(type)
				.field("dynamic","false")
				.startObject("_all").field("enabled",false)
				.endObject()
				.startObject("properties")
				.startObject("mnc").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("lac").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("ci").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("latitude").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("longtitude").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("uli").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("area_code").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("validity").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("address").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("province").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("city").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("district").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("township").field("type","keyword").field("ignore_above",128).endObject()
				.startObject("acc").field("type","keyword").field("ignore_above",128).endObject()
				.endObject().endObject().endObject().endObject();
		System.out.println(mapper);
		CreateIndexRequestBuilder cirb = client.admin().indices().prepareCreate(index).setSource(mapper);
		CreateIndexResponse reponse = cirb.execute().actionGet();
		mapper.close();
		if(reponse.isAcknowledged()) {
			System.out.println("创建索引成功");
		}else {
			System.out.println("创建索引失败");
		}
	}

3. Model实体:

public class Cellinfo {

	private String mnc;
	private String lac;
	private String ci;
	private String latitude;
	private String longtitude;
	private String uli;
	private String area_code;
	private String validity;
	private String address;
	private String province;
	private String city;
	private String district;
	private String township;
	private String acc;
	public Cellinfo() {
		super();
		// TODO Auto-generated constructor stub
	}
	public String getMnc() {
		return mnc;
	}
	public void setMnc(String mnc) {
		this.mnc = mnc;
	}
	...
	public String getAcc() {
		return acc;
	}
	public void setAcc(String acc) {
		this.acc = acc;
	}
	
    }

4. 批量插入数据:

    private void save(List<Cellinfo> list) {
		BulkRequestBuilder bulkRequest = ESUtils.client.prepareBulk();
		int count = 0;
		for(Cellinfo cell : list) {
			JSONObject json = JSONObject.fromObject(cell);
			bulkRequest.add(ESUtils.client.prepareIndex("cellinfo", "nx_type").setSource(json));
			
			if(++count%1000 == 0) {
				bulkRequest.execute().actionGet();
				bulkRequest = ESUtils.client.prepareBulk();
				System.out.println("插入-->"+1000+"--条!");
			}
			
		}
		bulkRequest.execute().actionGet();
		System.out.println("插入完毕,共计:"+count+"条...");
		
	}

4. maven依赖:

<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.2.4</version>
</dependency>
<!-- 日志依赖 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.21</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.14</version>
</dependency>
<dependency>
    <groupId>net.sf.json-lib</groupId>
    <artifactId>json-lib</artifactId>
    <version>2.4</version>
    <classifier>jdk15</classifier>
</dependency>

www.htsjk.Com true http://www.htsjk.com/Elasticsearch/31000.html NewsArticle Elasticsearch批量插入数据,elasticsearch插入 1. 创建本地TransportClient:     static TransportClient client = null;private static Logger logger = LoggerFactory.getLogger(ESUtils.class);public final static String HOST = "loca...
相关文章
    暂无相关文章
评论暂时关闭