Synopsis

An invariant is a property of the application that should always be true. In the context of the Cosmos SDK, an Invariant is a function that checks for a particular invariant. These functions are useful to detect bugs early on and act upon them to limit their potential consequences (e.g. by halting the chain). They are also useful in the development process of the application to detect bugs via simulations.
Pre-requisite Readings

Implementing Invariants

An Invariant is a function that checks for a particular invariant within a module. Module Invariants must follow the Invariant type:
package types

import "fmt"

/ An Invariant is a function which tests a particular invariant.
/ The invariant returns a descriptive message about what happened
/ and a boolean indicating whether the invariant has been broken.
/ The simulator will then halt and print the logs.
type Invariant func(ctx Context) (string, bool)

/ Invariants defines a group of invariants
type Invariants []Invariant

/ expected interface for registering invariants
type InvariantRegistry interface {
    RegisterRoute(moduleName, route string, invar Invariant)
}

/ FormatInvariant returns a standardized invariant message.
func FormatInvariant(module, name, msg string)

string {
    return fmt.Sprintf("%s: %s invariant\n%s\n", module, name, msg)
}
The string return value is the invariant message, which can be used when printing logs, and the bool return value is the actual result of the invariant check. In practice, each module implements Invariants in a keeper/invariants.go file within the module’s folder. The standard is to implement one Invariant function per logical grouping of invariants with the following model:
/ Example for an Invariant that checks balance-related invariants

func BalanceInvariants(k Keeper)

sdk.Invariant {
    return func(ctx context.Context) (string, bool) {
        / Implement checks for balance-related invariants
}
}
Additionally, module developers should generally implement an AllInvariants function that runs all the Invariants functions of the module:
/ AllInvariants runs all invariants of the module.
/ In this example, the module implements two Invariants: BalanceInvariants and DepositsInvariants

func AllInvariants(k Keeper)

sdk.Invariant {
    return func(ctx context.Context) (string, bool) {
    res, stop := BalanceInvariants(k)(ctx)
    if stop {
    return res, stop
}

return DepositsInvariant(k)(ctx)
}
}
Finally, module developers need to implement the RegisterInvariants method as part of the AppModule interface. Indeed, the RegisterInvariants method of the module, implemented in the module/module.go file, typically only defers the call to a RegisterInvariants method implemented in the keeper/invariants.go file. The RegisterInvariants method registers a route for each Invariant function in the InvariantRegistry:
package keeper

import (

	"bytes"
    "fmt"
    "cosmossdk.io/math"
	sdk "github.com/cosmos/cosmos-sdk/types"
    "github.com/cosmos/cosmos-sdk/x/staking/types"
)

/ RegisterInvariants registers all staking invariants
func RegisterInvariants(ir sdk.InvariantRegistry, k *Keeper) {
    ir.RegisterRoute(types.ModuleName, "module-accounts",
		ModuleAccountInvariants(k))

ir.RegisterRoute(types.ModuleName, "nonnegative-power",
		NonNegativePowerInvariant(k))

ir.RegisterRoute(types.ModuleName, "positive-delegation",
		PositiveDelegationInvariant(k))

ir.RegisterRoute(types.ModuleName, "delegator-shares",
		DelegatorSharesInvariant(k))
}

/ AllInvariants runs all invariants of the staking module.
func AllInvariants(k *Keeper)

sdk.Invariant {
    return func(ctx sdk.Context) (string, bool) {
    res, stop := ModuleAccountInvariants(k)(ctx)
    if stop {
    return res, stop
}

res, stop = NonNegativePowerInvariant(k)(ctx)
    if stop {
    return res, stop
}

res, stop = PositiveDelegationInvariant(k)(ctx)
    if stop {
    return res, stop
}

return DelegatorSharesInvariant(k)(ctx)
}
}

/ ModuleAccountInvariants checks that the bonded and notBonded ModuleAccounts pools
/ reflects the tokens actively bonded and not bonded
func ModuleAccountInvariants(k *Keeper)

sdk.Invariant {
    return func(ctx sdk.Context) (string, bool) {
    bonded := math.ZeroInt()
    notBonded := math.ZeroInt()
    bondedPool := k.GetBondedPool(ctx)
    notBondedPool := k.GetNotBondedPool(ctx)
    bondDenom := k.BondDenom(ctx)

k.IterateValidators(ctx, func(_ int64, validator types.ValidatorI)

bool {
    switch validator.GetStatus() {
    case types.Bonded:
				bonded = bonded.Add(validator.GetTokens())
    case types.Unbonding, types.Unbonded:
				notBonded = notBonded.Add(validator.GetTokens())

default:
				panic("invalid validator status")
}

return false
})

k.IterateUnbondingDelegations(ctx, func(_ int64, ubd types.UnbondingDelegation)

bool {
    for _, entry := range ubd.Entries {
    notBonded = notBonded.Add(entry.Balance)
}

return false
})
    poolBonded := k.bankKeeper.GetBalance(ctx, bondedPool.GetAddress(), bondDenom)
    poolNotBonded := k.bankKeeper.GetBalance(ctx, notBondedPool.GetAddress(), bondDenom)
    broken := !poolBonded.Amount.Equal(bonded) || !poolNotBonded.Amount.Equal(notBonded)

		/ Bonded tokens should equal sum of tokens with bonded validators
		/ Not-bonded tokens should equal unbonding delegations	plus tokens on unbonded validators
		return sdk.FormatInvariant(types.ModuleName, "bonded and not bonded module account coins", fmt.Sprintf(
			"\tPool's bonded tokens: %v\n"+
				"\tsum of bonded tokens: %v\n"+
				"not bonded token invariance:\n"+
				"\tPool's not bonded tokens: %v\n"+
				"\tsum of not bonded tokens: %v\n"+
				"module accounts total (bonded + not bonded):\n"+
				"\tModule Accounts' tokens: %v\n"+
				"\tsum tokens:              %v\n",
			poolBonded, bonded, poolNotBonded, notBonded, poolBonded.Add(poolNotBonded), bonded.Add(notBonded))), broken
}
}

/ NonNegativePowerInvariant checks that all stored validators have >= 0 power.
func NonNegativePowerInvariant(k *Keeper)

sdk.Invariant {
    return func(ctx sdk.Context) (string, bool) {
    var (
			msg    string
			broken bool
		)
    iterator := k.ValidatorsPowerStoreIterator(ctx)
    for ; iterator.Valid(); iterator.Next() {
    validator, found := k.GetValidator(ctx, iterator.Value())
    if !found {
    panic(fmt.Sprintf("validator record not found for address: %X\n", iterator.Value()))
}
    powerKey := types.GetValidatorsByPowerIndexKey(validator, k.PowerReduction(ctx))
    if !bytes.Equal(iterator.Key(), powerKey) {
    broken = true
				msg += fmt.Sprintf("power store invariance:\n\tvalidator.Power: %v"+
					"\n\tkey should be: %v\n\tkey in store: %v\n",
					validator.GetConsensusPower(k.PowerReduction(ctx)), powerKey, iterator.Key())
}
    if validator.Tokens.IsNegative() {
    broken = true
				msg += fmt.Sprintf("\tnegative tokens for validator: %v\n", validator)
}

}

iterator.Close()

return sdk.FormatInvariant(types.ModuleName, "nonnegative power", fmt.Sprintf("found invalid validator powers\n%s", msg)), broken
}
}

/ PositiveDelegationInvariant checks that all stored delegations have > 0 shares.
func PositiveDelegationInvariant(k *Keeper)

sdk.Invariant {
    return func(ctx sdk.Context) (string, bool) {
    var (
			msg   string
			count int
		)
    delegations := k.GetAllDelegations(ctx)
    for _, delegation := range delegations {
    if delegation.Shares.IsNegative() {
    count++
				msg += fmt.Sprintf("\tdelegation with negative shares: %+v\n", delegation)
}
    if delegation.Shares.IsZero() {
    count++
				msg += fmt.Sprintf("\tdelegation with zero shares: %+v\n", delegation)
}

}
    broken := count != 0

		return sdk.FormatInvariant(types.ModuleName, "positive delegations", fmt.Sprintf(
			"%d invalid delegations found\n%s", count, msg)), broken
}
}

/ DelegatorSharesInvariant checks whether all the delegator shares which persist
/ in the delegator object add up to the correct total delegator shares
/ amount stored in each validator.
func DelegatorSharesInvariant(k *Keeper)

sdk.Invariant {
    return func(ctx sdk.Context) (string, bool) {
    var (
			msg    string
			broken bool
		)
    validators := k.GetAllValidators(ctx)
    validatorsDelegationShares := map[string]math.LegacyDec{
}

		/ initialize a map: validator -> its delegation shares
    for _, validator := range validators {
    validatorsDelegationShares[validator.GetOperator().String()] = math.LegacyZeroDec()
}

		/ iterate through all the delegations to calculate the total delegation shares for each validator
    delegations := k.GetAllDelegations(ctx)
    for _, delegation := range delegations {
    delegationValidatorAddr := delegation.GetValidatorAddr().String()
    validatorDelegationShares := validatorsDelegationShares[delegationValidatorAddr]
			validatorsDelegationShares[delegationValidatorAddr] = validatorDelegationShares.Add(delegation.Shares)
}

		/ for each validator, check if its total delegation shares calculated from the step above equals to its expected delegation shares
    for _, validator := range validators {
    expValTotalDelShares := validator.GetDelegatorShares()
    calculatedValTotalDelShares := validatorsDelegationShares[validator.GetOperator().String()]
    if !calculatedValTotalDelShares.Equal(expValTotalDelShares) {
    broken = true
				msg += fmt.Sprintf("broken delegator shares invariance:\n"+
					"\tvalidator.DelegatorShares: %v\n"+
					"\tsum of Delegator.Shares: %v\n", expValTotalDelShares, calculatedValTotalDelShares)
}

}

return sdk.FormatInvariant(types.ModuleName, "delegator shares", msg), broken
}
}
For more, see an example of Invariants implementation from the staking module.

Invariant Registry

The InvariantRegistry is a registry where the Invariants of all the modules of an application are registered. There is only one InvariantRegistry per application, meaning module developers need not implement their own InvariantRegistry when building a module. All module developers need to do is to register their modules’ invariants in the InvariantRegistry, as explained in the section above. The rest of this section gives more information on the InvariantRegistry itself, and does not contain anything directly relevant to module developers. At its core, the InvariantRegistry is defined in the Cosmos SDK as an interface:
package types

import "fmt"

/ An Invariant is a function which tests a particular invariant.
/ The invariant returns a descriptive message about what happened
/ and a boolean indicating whether the invariant has been broken.
/ The simulator will then halt and print the logs.
type Invariant func(ctx Context) (string, bool)

/ Invariants defines a group of invariants
type Invariants []Invariant

/ expected interface for registering invariants
type InvariantRegistry interface {
    RegisterRoute(moduleName, route string, invar Invariant)
}

/ FormatInvariant returns a standardized invariant message.
func FormatInvariant(module, name, msg string)

string {
    return fmt.Sprintf("%s: %s invariant\n%s\n", module, name, msg)
}
Typically, this interface is implemented in the keeper of a specific module. The most used implementation of an InvariantRegistry can be found in the crisis module:
package keeper

import (

	"context"
    "fmt"
    "time"
    "cosmossdk.io/collections"
    "cosmossdk.io/core/address"
    "cosmossdk.io/log"

	storetypes "cosmossdk.io/core/store"
    "github.com/cosmos/cosmos-sdk/codec"
	sdk "github.com/cosmos/cosmos-sdk/types"
    "github.com/cosmos/cosmos-sdk/x/crisis/types"
)

/ Keeper - crisis keeper
type Keeper struct {
    routes         []types.InvarRoute
	invCheckPeriod uint
	storeService   storetypes.KVStoreService
	cdc            codec.BinaryCodec

	/ the address capable of executing a MsgUpdateParams message. Typically, this
	/ should be the x/gov module account.
	authority string

	supplyKeeper types.SupplyKeeper

	feeCollectorName string / name of the FeeCollector ModuleAccount

	addressCodec address.Codec

	Schema      collections.Schema
	ConstantFee collections.Item[sdk.Coin]
}

/ NewKeeper creates a new Keeper object
func NewKeeper(
	cdc codec.BinaryCodec, storeService storetypes.KVStoreService, invCheckPeriod uint,
	supplyKeeper types.SupplyKeeper, feeCollectorName, authority string, ac address.Codec,
) *Keeper {
    sb := collections.NewSchemaBuilder(storeService)
    k := &Keeper{
    storeService:     storeService,
		cdc:              cdc,
		routes:           make([]types.InvarRoute, 0),
		invCheckPeriod:   invCheckPeriod,
		supplyKeeper:     supplyKeeper,
		feeCollectorName: feeCollectorName,
		authority:        authority,
		addressCodec:     ac,
    ConstantFee: collections.NewItem(sb, types.ConstantFeeKey, "constant_fee", codec.CollValue[sdk.Coin](/docs/sdk/next/documentation/module-system/cdc)),
}

schema, err := sb.Build()
    if err != nil {
    panic(err)
}

k.Schema = schema
	return k
}

/ GetAuthority returns the x/crisis module's authority.
func (k *Keeper)

GetAuthority()

string {
    return k.authority
}

/ Logger returns a module-specific logger.
func (k *Keeper)

Logger(ctx context.Context)

log.Logger {
    sdkCtx := sdk.UnwrapSDKContext(ctx)

return sdkCtx.Logger().With("module", "x/"+types.ModuleName)
}

/ RegisterRoute register the routes for each of the invariants
func (k *Keeper)

RegisterRoute(moduleName, route string, invar sdk.Invariant) {
    invarRoute := types.NewInvarRoute(moduleName, route, invar)

k.routes = append(k.routes, invarRoute)
}

/ Routes - return the keeper's invariant routes
func (k *Keeper)

Routes() []types.InvarRoute {
    return k.routes
}

/ Invariants returns a copy of all registered Crisis keeper invariants.
func (k *Keeper)

Invariants() []sdk.Invariant {
    invars := make([]sdk.Invariant, len(k.routes))
    for i, route := range k.routes {
    invars[i] = route.Invar
}

return invars
}

/ AssertInvariants asserts all registered invariants. If any invariant fails,
/ the method panics.
func (k *Keeper)

AssertInvariants(ctx sdk.Context) {
    logger := k.Logger(ctx)
    start := time.Now()
    invarRoutes := k.Routes()
    n := len(invarRoutes)
    for i, ir := range invarRoutes {
    logger.Info("asserting crisis invariants", "inv", fmt.Sprint(i+1, "/", n), "name", ir.FullRoute())

invCtx, _ := ctx.CacheContext()
    if res, stop := ir.Invar(invCtx); stop {
			/ TODO: Include app name as part of context to allow for this to be
			/ variable.
			panic(fmt.Errorf("invariant broken: %s\n"+
				"\tCRITICAL please submit the following transaction:\n"+
				"\t\t tx crisis invariant-broken %s %s", res, ir.ModuleName, ir.Route))
}

}
    diff := time.Since(start)

logger.Info("asserted all invariants", "duration", diff, "height", ctx.BlockHeight())
}

/ InvCheckPeriod returns the invariant checks period.
func (k *Keeper)

InvCheckPeriod()

uint {
    return k.invCheckPeriod
}

/ SendCoinsFromAccountToFeeCollector transfers amt to the fee collector account.
func (k *Keeper)

SendCoinsFromAccountToFeeCollector(ctx context.Context, senderAddr sdk.AccAddress, amt sdk.Coins)

error {
    return k.supplyKeeper.SendCoinsFromAccountToModule(ctx, senderAddr, k.feeCollectorName, amt)
}
The InvariantRegistry is therefore typically instantiated by instantiating the keeper of the crisis module in the application’s constructor function. Invariants can be checked manually via messages, but most often they are checked automatically at the end of each block. Here is an example from the crisis module:
package crisis

import (

	"context"
    "time"
    "github.com/cosmos/cosmos-sdk/telemetry"
	sdk "github.com/cosmos/cosmos-sdk/types"
    "github.com/cosmos/cosmos-sdk/x/crisis/keeper"
    "github.com/cosmos/cosmos-sdk/x/crisis/types"
)

/ check all registered invariants
func EndBlocker(ctx context.Context, k keeper.Keeper) {
    defer telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), telemetry.MetricKeyEndBlocker)
    sdkCtx := sdk.UnwrapSDKContext(ctx)
    if k.InvCheckPeriod() == 0 || sdkCtx.BlockHeight()%int64(k.InvCheckPeriod()) != 0 {
		/ skip running the invariant check
		return
}

k.AssertInvariants(sdkCtx)
}
In both cases, if one of the Invariants returns false, the InvariantRegistry can trigger special logic (e.g. have the application panic and print the Invariants message in the log).