|
9 | 9 | "slices"
|
10 | 10 | "sync"
|
11 | 11 | "time"
|
| 12 | + "unsafe" |
12 | 13 |
|
13 | 14 | "github.com/cockroachdb/errors"
|
14 | 15 | "github.com/cockroachdb/redact"
|
@@ -96,45 +97,75 @@ type CategoryStatsAggregate struct {
|
96 | 97 | CategoryStats CategoryStats
|
97 | 98 | }
|
98 | 99 |
|
| 100 | +const numCategoryStatsShards = 8 |
| 101 | + |
99 | 102 | type categoryStatsWithMu struct {
|
100 | 103 | mu sync.Mutex
|
101 | 104 | // Protected by mu.
|
102 |
| - stats CategoryStatsAggregate |
| 105 | + stats CategoryStats |
103 | 106 | }
|
104 | 107 |
|
105 | 108 | // CategoryStatsCollector collects and aggregates the stats per category.
|
106 | 109 | type CategoryStatsCollector struct {
|
107 | 110 | // mu protects additions to statsMap.
|
108 | 111 | mu sync.Mutex
|
109 |
| - // Category => categoryStatsWithMu. |
| 112 | + // Category => *shardedCategoryStats. |
110 | 113 | statsMap sync.Map
|
111 | 114 | }
|
112 | 115 |
|
| 116 | +// shardedCategoryStats accumulates stats for a category, splitting its stats |
| 117 | +// across multiple shards to prevent mutex contention. In high-read workloads, |
| 118 | +// contention on the category stats mutex has been observed. |
| 119 | +type shardedCategoryStats struct { |
| 120 | + Category Category |
| 121 | + QoSLevel QoSLevel |
| 122 | + shards [numCategoryStatsShards]struct { |
| 123 | + categoryStatsWithMu |
| 124 | + // Pad each shard to 64 bytes so they don't share a cache line. |
| 125 | + _ [64 - unsafe.Sizeof(categoryStatsWithMu{})]byte |
| 126 | + } |
| 127 | +} |
| 128 | + |
| 129 | +// getStats retrieves the aggregated stats for the category, summing across all |
| 130 | +// shards. |
| 131 | +func (s *shardedCategoryStats) getStats() CategoryStatsAggregate { |
| 132 | + agg := CategoryStatsAggregate{ |
| 133 | + Category: s.Category, |
| 134 | + QoSLevel: s.QoSLevel, |
| 135 | + } |
| 136 | + for i := range s.shards { |
| 137 | + s.shards[i].mu.Lock() |
| 138 | + agg.CategoryStats.aggregate(s.shards[i].stats) |
| 139 | + s.shards[i].mu.Unlock() |
| 140 | + } |
| 141 | + return agg |
| 142 | +} |
| 143 | + |
113 | 144 | func (c *CategoryStatsCollector) reportStats(
|
114 |
| - category Category, qosLevel QoSLevel, stats CategoryStats, |
| 145 | + p uintptr, category Category, qosLevel QoSLevel, stats CategoryStats, |
115 | 146 | ) {
|
116 | 147 | v, ok := c.statsMap.Load(category)
|
117 | 148 | if !ok {
|
118 | 149 | c.mu.Lock()
|
119 |
| - v, _ = c.statsMap.LoadOrStore(category, &categoryStatsWithMu{ |
120 |
| - stats: CategoryStatsAggregate{Category: category, QoSLevel: qosLevel}, |
| 150 | + v, _ = c.statsMap.LoadOrStore(category, &shardedCategoryStats{ |
| 151 | + Category: category, |
| 152 | + QoSLevel: qosLevel, |
121 | 153 | })
|
122 | 154 | c.mu.Unlock()
|
123 | 155 | }
|
124 |
| - aggStats := v.(*categoryStatsWithMu) |
125 |
| - aggStats.mu.Lock() |
126 |
| - aggStats.stats.CategoryStats.aggregate(stats) |
127 |
| - aggStats.mu.Unlock() |
| 156 | + |
| 157 | + shardedStats := v.(*shardedCategoryStats) |
| 158 | + s := p & (numCategoryStatsShards - 1) |
| 159 | + shardedStats.shards[s].mu.Lock() |
| 160 | + shardedStats.shards[s].stats.aggregate(stats) |
| 161 | + shardedStats.shards[s].mu.Unlock() |
128 | 162 | }
|
129 | 163 |
|
130 | 164 | // GetStats returns the aggregated stats.
|
131 | 165 | func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate {
|
132 | 166 | var stats []CategoryStatsAggregate
|
133 | 167 | c.statsMap.Range(func(_, v any) bool {
|
134 |
| - aggStats := v.(*categoryStatsWithMu) |
135 |
| - aggStats.mu.Lock() |
136 |
| - s := aggStats.stats |
137 |
| - aggStats.mu.Unlock() |
| 168 | + s := v.(*shardedCategoryStats).getStats() |
138 | 169 | if len(s.Category) == 0 {
|
139 | 170 | s.Category = "_unknown"
|
140 | 171 | }
|
@@ -175,6 +206,6 @@ func (accum *iterStatsAccumulator) reportStats(
|
175 | 206 |
|
176 | 207 | func (accum *iterStatsAccumulator) close() {
|
177 | 208 | if accum.collector != nil {
|
178 |
| - accum.collector.reportStats(accum.Category, accum.QoSLevel, accum.stats) |
| 209 | + accum.collector.reportStats(uintptr(unsafe.Pointer(accum)), accum.Category, accum.QoSLevel, accum.stats) |
179 | 210 | }
|
180 | 211 | }
|
0 commit comments