欢迎投稿

今日深度:

实现Cassandra数据自动失效功能,cassandra自动失效

实现Cassandra数据自动失效功能,cassandra自动失效


原文链接: http://xulingbo.net

一、 Cassandra 数据失效机制

现在使用 Cassandra 的地方很多,由于 Cassandra 的写的性能很好,所以有一部分使用 Cassandra 做为类似于日志功能来使用,所
以一个需求就提出来了,那就是希望 Cassandra 能提供一个自动失效功能,希望 Cassandra 能保留一定天数后,能自动删除数据。

这种需求的确很常见,但是遗憾的是Cassandra 目前仍然不能满足这个需求,虽然Cassandra 已经提供了实现这个功能的基础,下面详细看一下Cassandra 是怎么删除数据的:在我写的另一篇文章中也介绍了Cassandra 删除数据的规则《Cassandra 分布式数据库详解》系列文档。

Cassandra 判断数据是否有效有三个地方,分别是下面代码片段:

第一段代码

long maxChange = column.mostRecentLiveChangeAt();

return (!column.isMarkedForDelete() || column.getLocalDeletionTime() >
gcBefore || maxChange > column.getMarkedForDeleteAt()) // (1)

&& (!container.isMarkedForDelete() || maxChange >
container.getMarkedForDeleteAt()); //(2)

这段代码判断这个列是否应该被关联,关联有两个条件

(1) 列没有被删除或者删除的时间在有效期以内或者删除的时间在最后修改数据的数据之前

(2) 列所在的容器没有被删除或者列的修改时间在容器删除时间之后

第二段代码

for (byte[] cname : cf.getColumnNames())

{

IColumn c = cf.getColumnsMap().get(cname);

long minTimestamp = Math.max(c.getMarkedForDeleteAt(),
cf.getMarkedForDeleteAt());

for (IColumn subColumn : c.getSubColumns())

{

if (subColumn.timestamp() <= minTimestamp|| (subColumn.isMarkedForDelete() && subColumn.getLocalDeletionTime()<= gcBefore))

{

((SuperColumn)c).remove(subColumn.name());

}

}

if (c.getSubColumns().isEmpty() && c.getLocalDeletionTime() <=
gcBefore)

{

cf.remove(c.name());

}

}

for (byte[] cname : cf.getColumnNames())

{

IColumn c = cf.getColumnsMap().get(cname);

if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore)

|| c.timestamp() <= cf.getMarkedForDeleteAt())

{

cf.remove(cname);

}

}

if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <=
gcBefore)

{

return null;

}

第二段代码是判断数据是否应该被删除,也有两个或条件

(1) 列已经被删除了并且数据已经过了时效期

(2) 数据的修改时间在容器删除时间之前

当所有列都删除并且容器已失效,这个 key 就会被删除, key 的删除是在 SSTable 合并的时候完成的

第三段代码是在客户端中

for (IColumn column : columns)

{

if (column.isMarkedForDelete()){

continue;

}

Column thrift_column = new Column(column.name(), column.value(),
column.timestamp());

thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));

}

这段代码清楚的说明只要列被删除,客户端将取不到数据

关于 gcBefore 是在配置文件中设置的 864000 ,很多人以为这个时间就是数据的失效时间,以为在这个时间段内数据可以被使用,从上面的三段代码来看, Cassandra 在设计数据失效机制,在实际应用中几乎没有利用价值,也就是数据失效对使用者来说没有任何好处

二、 改造 Cassandra 实现真正的自动失效功能

从上分析 Cassandra 判断数据是否失效主要根据三个标识

(1) Column 是否被删除标识: isMarkedForDelete

(2) Column getLocalDeletionTime() ,这个时间就是个失效时间 GCGraceSeconds 比较,判断该 Column 是否已经失效,这个和前面的条件是并集

(3) Column 和所在 ColumnFamily 的删除时间 markedForDeleteAt 比较如果小于这个时间,改 Column 就会被删除

所以我们要实现自动失效数据,不用通过调用 remove 接口,就能实现。也就是当数据在写到数据库之前,就标识这个数据在 GCGraceSeconds 时间内有效,一旦超过这个时间,数据被被自动删除,而且是物理删除。

需要改造的地方如下:

A. 改造 org.apache.cassandra.thrift. ColumnPath 类,增加一个

public boolean is_delete;

属性 , 当我们在调用 insert 接口是标识这个 Column 数据是自动失效的。如下所示:

ColumnPath col = new
ColumnPath(columnFamily,superColumn,column.getBytes(“UTF-8″),
true );

client.insert(keyspace,key,col,value.getBytes(),System.currentTimeMillis(),
ConsistencyLevel.ONE);

B. 改造 org.apache.cassandra.thrift. Column

也增加同样增加 is_delete 属性,当我们调用 batch_insert batch_mutate 接口是同样可以设置 Column 时支持自动失效的。

A B 是客户端接口需要修改的地方,下面是服务器端要修改的地方:

C. org.apache.cassandra.db.filter. QueryPath

也增加同样增加 is_delete 属性,用来保存客户端传过来的 is_delete 值。并增加一个结构体:

public QueryPath(String columnFamilyName, byte[] superColumnName, byte[] columnName,boolean is_delete) {
this.columnFamilyName = columnFamilyName;
this.superColumnName = superColumnName;
this.columnName = columnName;
this.is_delete = is_delete;

}

insert batch_insert batch_mutate 三个接口创建 QueryPath 对象的地方改成上面这个结构体创建对象

D. org.apache.cassandra.db. ColumnFamily

修改 addColumn 方法,将 false 改为 path.is_delete

public void addColumn(QueryPath path, byte[] value, long timestamp)

{
addColumn(path, value, timestamp,
path.is_delete );
//addColumn(path, value, timestamp,
false );

}

E. org.apache.cassandra.db. Column

修改 getLocalDeletionTime 方法,直接去 timestamp 时间

public int getLocalDeletionTime()

{
assert isMarkedForDelete;
//return ByteBuffer.wrap(value).getInt();

return (int)(timestamp/1000);

}

同时修改 comparePriority 方法,改变 Column 替换规则

public long comparePriority(Column o)

{
if(o.timestamp == -1){
return -1;

}
if(this.timestamp == -1){
return 1;

}

if(isMarkedForDelete)

{
// tombstone always wins ties.
return timestamp < o.timestamp ? -1 : 1;

}
return timestamp – o.timestamp;

}

F.
org.apache.cassandra.db. RowMutation

修改 delete 方法

public void delete(QueryPath path, long timestamp)

{
assert path.columnFamilyName != null;
String cfName = path.columnFamilyName;

int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
ColumnFamily columnFamily = modifications_.get(cfName);

if (columnFamily == null)
columnFamily = ColumnFamily.create(table_, cfName);

if (path.superColumnName ==
null && path.columnName == null)

{
columnFamily.delete(localDeleteTime, timestamp);

}

else
if (path.columnName == null)

{
SuperColumn sc = new SuperColumn(path.superColumnName,
DatabaseDescriptor.getSubComparator(table_, cfName));
sc.markForDeleteAt(localDeleteTime, timestamp);
columnFamily.addColumn(sc);

}

else

{

ByteBuffer bytes = ByteBuffer.allocate(4);
bytes.putInt(localDeleteTime);
long deleteTime = -1;
//columnFamily.addColumn(path, bytes.array(), timestamp, true);
columnFamily.addColumn(path,
bytes.array(), deleteTime, true);

}

modifications_.put(cfName, columnFamily);

}

调用删除接口时将删除时间设为 -1 ,这个和前面的修改的 comparePriority 方法相适应。

G. 修改 CassandraServer 类的 thriftifyColumns thriftifySubColumns

却掉 isMarkedForDelete 检查

public List<ColumnOrSuperColumn>
thriftifyColumns(Collection<IColumn> columns, boolean reverseOrder)

{
ArrayList<ColumnOrSuperColumn> thriftColumns = new
ArrayList<ColumnOrSuperColumn>(columns.size());

for
(IColumn column : columns)

{
/*if (column.isMarkedForDelete())
{

continue;
}*/


Column thrift_column = new Column(column.name(), column.value(),
column.timestamp());
thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));

}

//we have to do the reversing here, since internally we pass results around in
ColumnFamily

//
objects, which always sort their columns in the “natural” order

//TODO this is inconvenient for direct users of StorageProxy

if (reverseOrder)
Collections.reverse(thriftColumns);
return thriftColumns;

}

数据有效时间可以通过 GCGraceSeconds 配置项来设置,这样超过 gcBefore 时间,客户端就会取不到数据,并且 Cassandra 在执行 SSTable 合并的时候会执行物理删除,如果想立即删除数据可以调用 remove 接口,数据将会立即被删除,但是在有效时间内被删除的数据客户端仍然能够取到数据。这样就真正实现了数据自动失效和删除的功能。

www.htsjk.Com true http://www.htsjk.com/cassandra/31587.html NewsArticle 实现Cassandra数据自动失效功能,cassandra自动失效 原文链接: http://xulingbo.net 一、 Cassandra 数据失效机制 现在使用 Cassandra 的地方很多,由于 Cassandra 的写的性能很好,所以有一部分使用...
相关文章
    暂无相关文章
评论暂时关闭