data_tasks.go
· 6.1 KiB · Go
原始文件
package assets
import (
"popplio/state"
"github.com/infinitybotlist/eureka/dovewing"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)
var json = jsoniter.ConfigFastest
func DataTask(taskId, taskName, id, ip string) {
del := taskName == "data_delete"
var done bool
l, _ := newTaskLogger(taskId)
// Fail failed tasks
defer func() {
if !done {
l.Error("Failed to complete task", zap.String("id", id), zap.Bool("del", del))
_, err := state.Pool.Exec(state.Context, "UPDATE tasks SET state = $1 WHERE task_id = $2", "failed", taskId)
if err != nil {
l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
}
}
}()
tx, err := state.Pool.Begin(state.Context)
if err != nil {
l.Error("Failed to begin transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
return
}
defer tx.Rollback(state.Context)
_, err = tx.Exec(state.Context, "DELETE FROM tasks WHERE task_name = $1 AND task_id != $2 AND for_user = $3", taskName, taskId, id)
if err != nil {
l.Error("Failed to delete old data tasks", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
return
}
l.Info("Started DR/DDR task", zap.String("id", id), zap.Bool("del", del))
tableRefs, err := getAllTableRefs(tx)
if err != nil {
l.Error("Failed to get table refs", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
return
}
// Begin fetching data recursively
collectedData := map[string][]any{}
cachedEntityIds := map[string][]string{}
for _, tableRef := range tableRefs {
fOp, ok := tableOps[tableRef.ForeignTableName]
if !ok {
l.Warn("Cannot fetch table due to no support for its foreign ref", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id))
continue
}
var entityIds []string
if ids, ok := cachedEntityIds[tableRef.ForeignTableName]; ok {
entityIds = ids
} else {
entityIds, err := fOp.GetIdsForUser(tx, l, id)
if err != nil {
l.Error("Failed to get ids", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
continue
}
cachedEntityIds[tableRef.ForeignTableName] = entityIds
}
var fkeysNotAdded bool
if _, ok := collectedData[tableRef.ForeignTableName]; !ok {
fkeysNotAdded = true
}
// Handle the entities now
for _, entityId := range entityIds {
l.Info("Fetching table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId))
rows, err := fOp.Fetch(tx, l, tableRef.TableName, tableRef.ColumnName, entityId)
if err != nil {
l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
continue
}
for _, row := range rows {
if _, ok := collectedData[tableRef.TableName]; !ok {
collectedData[tableRef.TableName] = []any{}
}
collectedData[tableRef.TableName] = append(collectedData[tableRef.TableName], row)
}
if del {
err = fOp.Delete(tx, l, tableRef.TableName, tableRef.ColumnName, entityId)
if err != nil {
l.Error("Failed to delete table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
}
}
// Fetch the foreign table's data
if fkeysNotAdded {
l.Info("Fetching foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId))
rows, err := fOp.Fetch(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId)
if err != nil {
l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
continue
}
for _, row := range rows {
if _, ok := collectedData[tableRef.ForeignTableName]; !ok {
collectedData[tableRef.ForeignTableName] = []any{}
}
collectedData[tableRef.ForeignTableName] = append(collectedData[tableRef.ForeignTableName], row)
}
if del {
err = fOp.Delete(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId)
if err != nil {
l.Error("Failed to delete foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err))
}
}
}
}
}
// Delete from psql user_cache if `del` is true
if del {
for _, dovewingPlatform := range []*dovewing.DiscordState{state.DovewingPlatformDiscord} {
l.Info("Deleting from user_cache [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()))
res, err := dovewing.ClearUser(state.Context, id, dovewingPlatform, dovewing.ClearUserReq{})
if err != nil {
l.Error("Error clearing user [dovewing]", zap.Error(err), zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()))
}
l.Info("Cleared user [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()), zap.Any("res", res))
}
}
finalOutput := map[string]any{
"data": collectedData,
"meta": map[string]any{
"request_ip": ip,
},
}
_, err = tx.Exec(state.Context, "UPDATE tasks SET output = $1, state = $2 WHERE task_id = $3", finalOutput, "completed", taskId)
if err != nil {
l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
return
}
err = tx.Commit(state.Context)
if err != nil {
l.Error("Failed to commit transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del))
return
}
done = true
}
1 | package assets |
2 | |
3 | import ( |
4 | "popplio/state" |
5 | |
6 | "github.com/infinitybotlist/eureka/dovewing" |
7 | jsoniter "github.com/json-iterator/go" |
8 | "go.uber.org/zap" |
9 | ) |
10 | |
11 | var json = jsoniter.ConfigFastest |
12 | |
13 | func DataTask(taskId, taskName, id, ip string) { |
14 | del := taskName == "data_delete" |
15 | |
16 | var done bool |
17 | |
18 | l, _ := newTaskLogger(taskId) |
19 | |
20 | // Fail failed tasks |
21 | defer func() { |
22 | if !done { |
23 | l.Error("Failed to complete task", zap.String("id", id), zap.Bool("del", del)) |
24 | |
25 | _, err := state.Pool.Exec(state.Context, "UPDATE tasks SET state = $1 WHERE task_id = $2", "failed", taskId) |
26 | |
27 | if err != nil { |
28 | l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) |
29 | } |
30 | } |
31 | }() |
32 | |
33 | tx, err := state.Pool.Begin(state.Context) |
34 | |
35 | if err != nil { |
36 | l.Error("Failed to begin transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) |
37 | return |
38 | } |
39 | |
40 | defer tx.Rollback(state.Context) |
41 | |
42 | _, err = tx.Exec(state.Context, "DELETE FROM tasks WHERE task_name = $1 AND task_id != $2 AND for_user = $3", taskName, taskId, id) |
43 | |
44 | if err != nil { |
45 | l.Error("Failed to delete old data tasks", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) |
46 | return |
47 | } |
48 | |
49 | l.Info("Started DR/DDR task", zap.String("id", id), zap.Bool("del", del)) |
50 | |
51 | tableRefs, err := getAllTableRefs(tx) |
52 | |
53 | if err != nil { |
54 | l.Error("Failed to get table refs", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) |
55 | return |
56 | } |
57 | |
58 | // Begin fetching data recursively |
59 | collectedData := map[string][]any{} |
60 | cachedEntityIds := map[string][]string{} |
61 | for _, tableRef := range tableRefs { |
62 | fOp, ok := tableOps[tableRef.ForeignTableName] |
63 | |
64 | if !ok { |
65 | l.Warn("Cannot fetch table due to no support for its foreign ref", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id)) |
66 | continue |
67 | } |
68 | |
69 | var entityIds []string |
70 | |
71 | if ids, ok := cachedEntityIds[tableRef.ForeignTableName]; ok { |
72 | entityIds = ids |
73 | } else { |
74 | entityIds, err := fOp.GetIdsForUser(tx, l, id) |
75 | |
76 | if err != nil { |
77 | l.Error("Failed to get ids", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) |
78 | continue |
79 | } |
80 | |
81 | cachedEntityIds[tableRef.ForeignTableName] = entityIds |
82 | } |
83 | |
84 | var fkeysNotAdded bool |
85 | if _, ok := collectedData[tableRef.ForeignTableName]; !ok { |
86 | fkeysNotAdded = true |
87 | } |
88 | |
89 | // Handle the entities now |
90 | for _, entityId := range entityIds { |
91 | l.Info("Fetching table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId)) |
92 | rows, err := fOp.Fetch(tx, l, tableRef.TableName, tableRef.ColumnName, entityId) |
93 | |
94 | if err != nil { |
95 | l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) |
96 | continue |
97 | } |
98 | |
99 | for _, row := range rows { |
100 | if _, ok := collectedData[tableRef.TableName]; !ok { |
101 | collectedData[tableRef.TableName] = []any{} |
102 | } |
103 | |
104 | collectedData[tableRef.TableName] = append(collectedData[tableRef.TableName], row) |
105 | } |
106 | |
107 | if del { |
108 | err = fOp.Delete(tx, l, tableRef.TableName, tableRef.ColumnName, entityId) |
109 | |
110 | if err != nil { |
111 | l.Error("Failed to delete table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) |
112 | } |
113 | } |
114 | |
115 | // Fetch the foreign table's data |
116 | if fkeysNotAdded { |
117 | l.Info("Fetching foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId)) |
118 | rows, err := fOp.Fetch(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId) |
119 | |
120 | if err != nil { |
121 | l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) |
122 | continue |
123 | } |
124 | |
125 | for _, row := range rows { |
126 | if _, ok := collectedData[tableRef.ForeignTableName]; !ok { |
127 | collectedData[tableRef.ForeignTableName] = []any{} |
128 | } |
129 | |
130 | collectedData[tableRef.ForeignTableName] = append(collectedData[tableRef.ForeignTableName], row) |
131 | } |
132 | |
133 | if del { |
134 | err = fOp.Delete(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId) |
135 | |
136 | if err != nil { |
137 | l.Error("Failed to delete foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) |
138 | } |
139 | } |
140 | } |
141 | } |
142 | } |
143 | |
144 | // Delete from psql user_cache if `del` is true |
145 | if del { |
146 | for _, dovewingPlatform := range []*dovewing.DiscordState{state.DovewingPlatformDiscord} { |
147 | l.Info("Deleting from user_cache [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName())) |
148 | res, err := dovewing.ClearUser(state.Context, id, dovewingPlatform, dovewing.ClearUserReq{}) |
149 | |
150 | if err != nil { |
151 | l.Error("Error clearing user [dovewing]", zap.Error(err), zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName())) |
152 | } |
153 | |
154 | l.Info("Cleared user [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()), zap.Any("res", res)) |
155 | } |
156 | } |
157 | |
158 | finalOutput := map[string]any{ |
159 | "data": collectedData, |
160 | "meta": map[string]any{ |
161 | "request_ip": ip, |
162 | }, |
163 | } |
164 | |
165 | _, err = tx.Exec(state.Context, "UPDATE tasks SET output = $1, state = $2 WHERE task_id = $3", finalOutput, "completed", taskId) |
166 | |
167 | if err != nil { |
168 | l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) |
169 | return |
170 | } |
171 | |
172 | err = tx.Commit(state.Context) |
173 | |
174 | if err != nil { |
175 | l.Error("Failed to commit transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) |
176 | return |
177 | } |
178 | |
179 | done = true |
180 | } |
181 |