Cassandra是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩展性,被Digg、Twitter等知名Web 2.0网站所采纳,成为了一种流行的分布式结构化数据存储方案。
1.Cassandra特点弹性可扩展性 - Cassandra是高度可扩展的; 它允许添加更多的硬件以适应更多的客户和更多的数据根据要求。
始终基于架构 - Cassandra没有单点故障,它可以连续用于不能承担故障的关键业务应用程序。
快速线性性能 - Cassandra是线性可扩展性的,即它为你增加集群中的节点数量增加你的吞吐量。因此,保持一个快速的响应时间。
灵活的数据存储 - Cassandra适应所有可能的数据格式,包括:结构化,半结构化和非结构化。它可以根据您的需要动态地适应变化的数据结构。
便捷的数据分发 - Cassandra通过在多个数据中心之间复制数据,可以灵活地在需要时分发数据。
快速写入 - Cassandra被设计为在廉价的商品硬件上运行。 它执行快速写入,并可以存储数百TB的数据,而不牺牲读取效率。
数据写入操作密集
数据修改操作很少
通过主键查询
需要对数据进行分区存储
存储日志型数据
类似物联网的海量数据
对数据进行跟踪
docker安装cassandra官方快速教程 : https://cassandra.apache.org//quickstart.html
cassandra镜像: https://hub.docker.com//cassandra
docker run -p 9042:9042 --rm -d --name cassandra --hostname cassandra --network test cassandra:4.0创建创建客户端连接,建立一个测试的表
-- 启动客户端连接
docker run --rm -it --network test nuvo/docker-cqlsh cqlsh cassandra 9042 --cqlversion='3.4.5'
建库建表
-- Create a keyspace
CREATE KEYSPACE IF NOT EXISTS store WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : '1' };
-- Create a table
CREATE TABLE IF NOT EXISTS store.shopping_cart (
userid text PRIMARY KEY,
item_count int,
update_time bigint
);
-- Insert some data
INSERT INTO store.shopping_cart (userid, item_count, update_time) VALUES ('9876', 2, 1644204495000);
INSERT INTO store.shopping_cart (userid, item_count, update_time) VALUES ('1234', 5, 1644204495230);
三.CQL教程
Cassandra使用的是CQL。CQL 全称为 Cloud Query Language,是 LeanCloud 为查询 API 定制的一套类似 SQL 查询语法的子集和变种,其目的是让开发者可以使用传统的 SQL 语法来查询 LeanCloud 云端数据,从而减少学习 LeanCloud 查询 API 的成本
1.语法学习 基本语法:https://blog.csdn.net/y6622576/article/details/102728136
https://blog.csdn.net/itcast_cn/article/details/107559499
https://cassandra.apache.org/doc/latest/cassandra/cql/index.html
2.区别于sql不支持在 select 中使用 as 关键字为列增加别名。
update 和 delete 不提供批量更新和删除,只能根据 objectId(where objectId=xxx)和其他条件来更新或者删除某个文档。
不支持 join,关联查询提供 include、relatedTo 等语法来替代(关系查询)。
仅支持部分 SQL 函数(内置函数)。
不支持 group by、having、max、min、sum、distinct 等分组聚合查询语法。
不支持事务。
不支持锁。
1.cql的查询,where条件后面只支持索引查询
四.springboot快速整合Cassandrademo项目地址:https://gitee.com/lk0423/gradle-study-demo
1.建立springboot项目,这里使用gradle进行构建。下面是完整的gradle依赖构建文件plugins {
id 'org.springframework.boot' version '2.1.6.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management' //应用的插件
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories { //远程仓库,根据先后顺序,决定优先级
mavenLocal()
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
implementation 'com.codahale.metrics:metrics-core:3.0.2'
// cassandra
implementation 'com.datastax.cassandra:cassandra-driver-core:3.8.0'
implementation 'com.datastax.cassandra:cassandra-driver-mapping:3.8.0'
// lombok
annotationProcessor 'org.projectlombok:lombok:1.18.22'
compileonly 'org.projectlombok:lombok:1.18.22'
}
2.配置连接Cassandra
package lk.config;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import lombok.Getter;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Repository;
@Slf4j
@Repository
public class NewCassandraClient {
@Getter
@Delegate(types = Session.class)
private Session session;
public NewCassandraClient(@Value("${cassandra.keyspace}") String keyspace,
@Value("${cassandra.newNodes}") String nodes,
@Autowired ApplicationContext applicationContext) {
// 获取当前配置的环境
String activeProfile = applicationContext.getEnvironment().getActiveProfiles()[0];
//默认走测试环境
boolean dev = true;
if (activeProfile.contains("pre") || activeProfile.contains("prod")) {
dev = false;
}
QueryOptions queryOption = new QueryOptions()
.setFetchSize(1000)
.setConsistencyLevel(dev ? ConsistencyLevel.ONE : ConsistencyLevel.QUORUM);
SocketOptions socketOptions = new SocketOptions()
.setConnectTimeoutMillis(20000)
.setReadTimeoutMillis(20000);
Cluster.Builder builder = Cluster.builder().withQueryOptions(queryOption)
.withSocketOptions(socketOptions)
.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
Cluster cluster = builder.addContactPoints(nodes.split(",")).build();
session = cluster.connect(keyspace);
}
}
cassandra.keyspace=store
cassandra.newNodes=127.0.0.1
3.编写dao层
package lk.resposity.cassandra;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import lk.config.NewCassandraClient;
import lk.data.entity.ShoppingCartEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
@Repository
public class CassandraStoreDao {
private final String table = "shopping_cart";
@Autowired
private NewCassandraClient session;
public void add(ShoppingCartEntity shoppingCartEntity) {
Insert insert = QueryBuilder.insertInto(table)
.value("userid", shoppingCartEntity.getUserid())
.value("item_count", shoppingCartEntity.getItem_count())
.value("update_time", shoppingCartEntity.getUpdate_time());
session.executeAsync(insert);
}
public List getUserSoppingCart(String userId) {
BuiltStatement select = QueryBuilder.select()
.from(table)
.where(QueryBuilder.eq("userid", userId));
ResultSet resultSet = session.execute(select);
List ret = resultSet.all();
List result = new ArrayList<>();
for (Row r : ret) {
result.add(new ShoppingCartEntity(r.getString("userid"), r.getInt("item_count"), r.getLong("update_time")));
}
return result;
}
}
package lk.data.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ShoppingCartEntity {
private String userid;
private Integer item_count;
private Long update_time;
}
五.实战项目中的使用
具体dao层代码可以参考 tiku-ai-practice 项目中的 outfox.course.tiku.resposity.cassandra 下的各个dao的写法
添加// 单条插入
Insert insert = QueryBuilder.insertInto(tableName).value("undos",entity.getUnDoPools());
session.executeAsync(insert)
// 批量插入
BatchStatement batch = new BatchStatement();
for(UserSectionEntity entity: entities){
if(entity != null){
Insert insert = QueryBuilder.insertInto(tableName).value("undos",entity.getUnDoPools());
batch.add(insert);
}
}
session.executeAsync(batch);
删除
BuiltStatement delete = QueryBuilder.delete().from(tableName).where(QueryBuilder.eq("userId", userId));
session.executeAsync(delete);
更新
BuiltStatement statement = QueryBuilder.update(tableName)
.with(QueryBuilder.set("historys",history))
.where(QueryBuilder.eq("userId", userId))
.and(QueryBuilder.eq("subjectId",subjectId))
.and(QueryBuilder.eq("chapterId", chapterId==subjectId?-1:chapterId))
.and(QueryBuilder.eq("sectionId", knowledgeId==chapterId?-1:knowledgeId));
session.execute(statement);
查询(查出来的结果集,需要自行遍历筛选封装为需要的dto)
BuiltStatement statement = QueryBuilder.select().all().from(tableName).where(QueryBuilder.eq("userId", userId));
ResultSet resultSet = session.execute(statement);
List entities = new ArrayList<>();
List ret = resultSet.all().parallelStream().filter((r)->r.getLong("sectionId")==-1).collect(Collectors.toList());
for (Row r : ret) {
entities.add(new UserSectionEntity(userId,
r.getLong("subjectId"),
r.getLong("chapterId"),
r.getLong("sectionId"),
r.getLong("time"),
r.getInt("status"),
r.getDouble("savedPower"),
r.getDouble("realPower"),
r.getInt("generation"),
r.getList("historys",Long.class),
r.getList("undos",Long.class)));
}
return entities;



