2024-12-12 10:25:24 +08:00

553 lines
19 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MessagePack;
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Drawing.Printing;
using System.IO;
using System.Linq;
using System.Linq.Dynamic.Core;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using YunDa.ISAS.Redis.Configuration;
using YunDa.ISAS.Redis.Factory;
using MessagePack;
using MessagePack.Resolvers;
using System.Diagnostics;
using System.Text;
using MongoDB.Bson;
using Microsoft.AspNetCore.SignalR;
using MessagePack.Formatters;
namespace YunDa.ISAS.Redis.Repositories
{
public class RedisRepository<TEntity, TPrimaryKey> : IRedisRepository<TEntity, TPrimaryKey>
{
private IRedisClientFactory _redisClientFactory;
private IRedisConfiguration _redisConfiguration;
private IDatabase _database;
private ISubscriber _subscriber;
// 全局配置 CompositeResolver
private static readonly MessagePackSerializerOptions options = MessagePackSerializerOptions.Standard
.WithResolver(CompositeResolver.Create(
new IMessagePackFormatter[] { }, // 自定义时间格式化器
new IFormatterResolver[] {
ContractlessStandardResolver.Instance, // 支持无特性标记的类
StandardResolver.Instance
} // 支持有特性标记的类 } // 无特性标记解析器
));
public RedisRepository(IRedisClientFactory redisClientFactory, IRedisConfiguration redisConfiguration)
{
_redisClientFactory = redisClientFactory;
_redisConfiguration = redisConfiguration;
_database = _redisClientFactory.InstanceRedisDatabase();
_subscriber = _redisClientFactory.Subscriber;
JsonSerializerSettings settings = new JsonSerializerSettings
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
};
}
public async Task<bool> InsertOrUpdateoneAsync(TEntity entity, string key, TimeSpan? ttl = null)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return false;
}
//_database.KeyDeleteAsync(key.ToString());
var value = GetSerializeObjectString(entity);
if (ttl == null)
{
return await _database.StringSetAsync(key.ToString(), value);
}
return await _database.StringSetAsync(key.ToString(), value, ttl);
}
public async Task<long> PublishAsync(string channel,TEntity entity)
{
var jsonMessage = GetSerializeObjectString(entity); //序列化
var redisChannel = new RedisChannel(channel, RedisChannel.PatternMode.Literal);
// 发布消息
return await _subscriber.PublishAsync(redisChannel, jsonMessage);
}
public event Action<string, TEntity> OnMessageReceived;
// 订阅接口,支持模式匹配
public void Subscribe(string pattern)
{
var redisChannel = new RedisChannel(pattern, RedisChannel.PatternMode.Pattern);
_subscriber.Subscribe(redisChannel, (channel, message) =>
{
// 当收到消息时触发事件
OnMessageReceived(channel,GetDeserialize<TEntity>(message));
});
}
//封装的ListSet
public void ListUpdateAll(string key, List<TEntity> value)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
lock (_database)
{
_database.KeyDelete(key);
//下面的database 是redis的数据库对象.
var batch = _database.CreateBatch();
List<Task> tasks = new List<Task>();
foreach (var single in value)
{
var s = GetSerializeObjectString(single); //序列化
//_database.ListLeftPush(key, s); //要一个个的插入
var task = batch.ListLeftPushAsync(key, s);
tasks.Add(task);
}
batch.Execute();
_database.WaitAll(tasks.ToArray());
}
}
public async Task DeleteHashKeyAsync(string key, Guid id)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
for (int i = 0; i < 3; i++)
{
if (await _database.HashDeleteAsync(key, id.ToString()))
{
return;
}
Task.Delay(1000).Wait();
}
}
public async Task DeleteHashKeiesAsync(string key, List<Guid> ids)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
var RedisValues = new RedisValue[ids.Count];
for (int i = 0; i < ids.Count; i++)
{
RedisValues[i] = ids[i].ToString();
}
await _database.HashDeleteAsync(key, RedisValues);
}
public void ListRightPush(string key, List<TEntity> value)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
lock (_database) {
//下面的database 是redis的数据库对象.
var batch = _database.CreateBatch();
List<Task> tasks = new List<Task>();
foreach (var single in value)
{
var s = GetSerializeObjectString(single); //序列化
tasks.Add(batch.ListRightPushAsync(key, s));
}
batch.Execute();
_database.WaitAll(tasks.ToArray());
}
}
public void ListRightPush(string key, TEntity value)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
//下面的database 是redis的数据库对象.
var batch = _database.CreateBatch();
List<Task> tasks = new List<Task>();
var s = GetSerializeObjectString(value); //序列化
tasks.Add(batch.ListRightPushAsync(key, s));
batch.Execute();
_database.WaitAll(tasks.ToArray());
}
public async Task<long> ListLenghtAsync(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return 0;
}
//下面的database 是redis的数据库对象.
var batch = _database.CreateBatch();
var lenght = batch.ListLengthAsync(key);
batch.Execute();
return await lenght;
}
//封装的ListGet
public List<TEntity> ListGet(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return null;
}
lock (_database)
{
//ListRange返回的是一组字符串对象
//需要逐个反序列化成实体
var vList = _database.ListRange(key);
List<TEntity> result = new List<TEntity>();
foreach (var item in vList)
{
var model = GetDeserialize<TEntity>(item); //反序列化
result.Add(model);
}
return result;
}
}
public async Task<List<TEntity>> ListGetAsync(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return null;
}
//ListRange返回的是一组字符串对象
//需要逐个反序列化成实体
var vList = await _database.ListRangeAsync(key);
List<TEntity> result = new List<TEntity>();
lock (_database)
{
foreach (var item in vList)
{
var model = GetDeserialize<TEntity>(item); //反序列化
result.Add(model);
}
return result;
}
}
public Dictionary<string, TEntity> HashSetGetDicAll(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return null;
}
var result = new Dictionary<string, TEntity>();
lock (_database)
{
var arr = _database.HashGetAll(key);
if (arr != null)
{
foreach (var item in arr)
{
if (!item.Name.IsNullOrEmpty)
{
//var id = new Guid(item.Name.ToString());
var obj = GetDeserialize<TEntity>(item.Value);
result.TryAdd(item.Name, obj);
}
}
}
}
return result;
}
public TEntity HashSetGetOne(string key, Guid id)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return default;
}
var item = _database.HashGet(key, id.ToString());
if (!item.IsNullOrEmpty)
{
var result = GetDeserialize<TEntity>(item); //反序列化
return result;
}
return default;
}
public TEntity HashSetGetOne(string key, string id)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return default;
}
var item = _database.HashGet(key, id);
if (!item.IsNullOrEmpty)
{
var result = GetDeserialize<TEntity>(item); //反序列化
return result;
}
return default;
}
public async Task<TEntity> HashSetGetOneAsync(string key, string id)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return default;
}
var item =await _database.HashGetAsync(key, id);
if (!item.IsNullOrEmpty)
{
var result = GetDeserialize<TEntity>(item); //反序列化
return result;
}
return default;
}
public async Task<long> ListLeftPushAsync(string key, TEntity entity)
{
if (string.IsNullOrEmpty(key))
{
return 0;
}
var s = GetSerializeObjectString(entity);
if (await _database.ListLengthAsync(key) > int.MaxValue)
{
if (!await _database.KeyDeleteAsync(key))
{
await _database.KeyExpireAsync(key, TimeSpan.FromMilliseconds(10));
}
}
var res = await _database.ListLeftPushAsync(key, s);
return res;
}
public async Task ListRemoveStringAsync(string key, string value, long count = 0)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
await _database.ListRemoveAsync(key, value, count);
return;
}
public async Task<TEntity> GetListTailDataAsync(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return default;
}
if (_database.KeyExists(key))
{
var length = await _database.ListLengthAsync(key);
var res = await _database.ListGetByIndexAsync(key, length - 1);
var entity = GetDeserialize<TEntity>(res);
return entity;
}
return default;
}
public List<TEntity> GetAll()
{
if (_database == null)
{
return null;
}
List<TEntity> entities = new List<TEntity>();
var keys = _database.Multiplexer.GetServer(_database.Multiplexer.GetEndPoints().First()).Keys();
foreach (var item in keys)
{
if (_database.KeyExists(item))
{
var str = _database.StringGet(item);
var entity = GetDeserialize<TEntity>(str);
entities.Add(entity);
}
}
return entities;
}
public async Task<TEntity> GetOneAsync(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return default;
}
var str = await _database.StringGetAsync(key);
return str.IsNull ? default : GetDeserialize<TEntity>(str);
}
public async Task<bool> InsertOrUpdateOneAsync(TEntity entity, string key, TimeSpan? ttl = null)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return false;
}
var value = GetSerializeObjectString(entity);
return await _database.StringSetAsync(key, value, ttl);
}
private byte[] GetSerializeObjectString(TEntity entity)
{
return MessagePackSerializer.Serialize(entity, options);
}
private T GetDeserialize<T>(byte[] data)
{
if (data==null || data.Length==0)
{
return default;
}
//string jsonString = Encoding.UTF8.GetString(data);
//return JsonConvert.DeserializeObject<T>(jsonString);
//string hexString = BitConverter.ToString(data);
//Debug.WriteLine(hexString); // 输出12-AB-3C-4D
try
{
var entity = MessagePackSerializer.Deserialize<T>(data, options);
//Debug.WriteLine("MessagePackSerializer success"); // 输出12-AB-3C-4D
return entity;
}
catch (Exception ex)
{
string jsonString = Encoding.UTF8.GetString(data);
Debug.WriteLine("UTF8: " + jsonString); // 输出12-AB-3C-4D
return JsonConvert.DeserializeObject<T>(jsonString);
}
}
public async Task DeleteKeyAsync(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
int count = 0;
while (!await _database.KeyDeleteAsync(key) && count < 5)
{
count++;
await Task.Delay(10);
}
}
public async Task<bool> HashSetUpdateOneAsync(string key, string id, TEntity entity)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return false;
}
var s = GetSerializeObjectString(entity);
return await _database.HashSetAsync(key, id, s);
}
public async Task<bool> HashSetUpdateManyAsync(string key, List<Guid> ids, List<TEntity> entities)
{
if (string.IsNullOrEmpty(key) || _database == null || ids.Count != entities.Count)
{
return false;
}
var hashEntries = ids.Select((id, index) =>
new HashEntry(id.ToString(), GetSerializeObjectString(entities[index]))).ToArray();
await _database.HashSetAsync(key, hashEntries);
return true;
}
public async Task<bool> HashSetUpdateManyAsync(string key, List<string> ids, List<TEntity> entities)
{
if (string.IsNullOrEmpty(key) || _database == null || ids.Count != entities.Count)
{
return false;
}
var hashEntries = ids.Select((id, index) =>
new HashEntry(id, GetSerializeObjectString(entities[index]))).ToArray();
await _database.HashSetAsync(key, hashEntries);
return true;
}
public async Task<List<TEntity>> ListRangeAsync(string key, int start = 0, int stop = -1)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return null;
}
var redisValues = await _database.ListRangeAsync(key, start, stop);
return redisValues
.Select(item => GetDeserialize<TEntity>(item))
.ToList();
}
public async Task<TEntity> ListRightPopAsync(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return default;
}
var res = await _database.ListRightPopAsync(key);
return res.IsNull ? default : GetDeserialize<TEntity>(res);
}
public async Task<List<TEntity>> HashSetGetAllAsync(string key)
{
var hashEntries = await _database.HashGetAllAsync(key);
if (hashEntries.Length == 0)
return default;
List<TEntity> entities = new List<TEntity>();
foreach (var entry in hashEntries)
{
var value = GetDeserialize<TEntity>(entry.Value);
entities.Add(value);
}
return entities;
}
public async Task<TEntity> HashSetGetAllAsync(string key,string hashkey)
{
var hashEntries = await _database.HashGetAsync(key, hashkey);
var value = GetDeserialize<TEntity>(hashEntries);
return value;
}
public async Task<Dictionary<string, TEntity>> HashSetGetDicAllAsync(string key)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return null;
}
var result = new Dictionary<string, TEntity>();
var arr = await _database.HashGetAllAsync(key);
foreach (var item in arr)
{
if (!item.Name.IsNullOrEmpty)
{
var obj = GetDeserialize<TEntity>(item.Value);
result.TryAdd(item.Name, obj);
}
}
return result;
}
public async Task UpdateOneAsync(TEntity entity, string key, TimeSpan? ttl = null)
{
if (string.IsNullOrEmpty(key) || _database == null)
{
return;
}
var data = await _database.StringGetWithExpiryAsync(key);
var s = GetSerializeObjectString(entity);
await _database.StringSetAsync(key, s, data.Expiry ?? ttl);
}
public async Task<bool> KeyExistsAsync(string key)
{
return _database != null && await _database.KeyExistsAsync(key);
}
}
}